This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 7bce1bfd PROTON-2794 Resolve transfer ID wrapping issue in incoming 
window
7bce1bfd is described below

commit 7bce1bfd4be21ee5e894c57eb84d3791ee4f5caf
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Mar 5 14:17:02 2024 -0500

    PROTON-2794 Resolve transfer ID wrapping issue in incoming window
    
    Ensure that transfer ID wraps at max uint value and adds some tests
---
 pom.xml                                            |   5 +
 protonj2/pom.xml                                   |   5 +
 .../engine/impl/ProtonSessionIncomingWindow.java   |  36 ++-
 .../protonj2/engine/impl/ProtonReceiverTest.java   |  66 +++++
 .../impl/ProtonSessionIncomingWindowTest.java      | 266 +++++++++++++++++++++
 .../impl/ProtonSessionOutgoingWindowTest.java      |  81 +++++++
 6 files changed, 440 insertions(+), 19 deletions(-)

diff --git a/pom.xml b/pom.xml
index 3f3e31ae..71424e33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,6 +249,11 @@
         <artifactId>mockito-core</artifactId>
         <version>${mockito.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.mockito</groupId>
+        <artifactId>mockito-junit-jupiter</artifactId>
+        <version>${mockito.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
diff --git a/protonj2/pom.xml b/protonj2/pom.xml
index 986d2edc..97282731 100644
--- a/protonj2/pom.xml
+++ b/protonj2/pom.xml
@@ -84,6 +84,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-junit-jupiter</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
index 59bf6be5..d4acc17e 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
@@ -37,7 +37,7 @@ import org.apache.qpid.protonj2.types.transport.Transfer;
  */
 public class ProtonSessionIncomingWindow {
 
-    private static final long DEFAULT_WINDOW_SIZE = Integer.MAX_VALUE; // 
biggest legal value
+    private static final int DEFAULT_WINDOW_SIZE = Integer.MAX_VALUE; // 
biggest legal value
 
     private final ProtonSession session;
     private final ProtonEngine engine;
@@ -46,16 +46,16 @@ public class ProtonSessionIncomingWindow {
     private int incomingCapacity = 0;
 
     // Computed incoming window based on the incoming capacity minus bytes not 
yet read from deliveries.
-    private long incomingWindow = 0;
+    private int incomingWindow = 0;
 
     // Tracks the next expected incoming transfer ID from the remote
-    private long nextIncomingId = 0;
+    private int nextIncomingId = 0;
 
     // Tracks the most recent delivery Id for validation against the next 
incoming delivery
     private SequenceNumber lastDeliveryid;
 
-    private long maxFrameSize;
-    private long incomingBytes;
+    private int maxFrameSize;
+    private int incomingBytes;
 
     private UnsettledMap<ProtonIncomingDelivery> unsettled =
         new UnsettledMap<>(ProtonIncomingDelivery::getDeliveryIdInt);
@@ -63,7 +63,7 @@ public class ProtonSessionIncomingWindow {
     public ProtonSessionIncomingWindow(ProtonSession session) {
         this.session = session;
         this.engine = session.getConnection().getEngine();
-        this.maxFrameSize = session.getConnection().getMaxFrameSize();
+        this.maxFrameSize = (int) session.getConnection().getMaxFrameSize();
     }
 
     public void setIncomingCapacity(int incomingCapacity) {
@@ -75,11 +75,10 @@ public class ProtonSessionIncomingWindow {
     }
 
     public int getRemainingIncomingCapacity() {
-        // TODO: This is linked to below update of capacity which also needs 
more attention.
-        if (incomingCapacity <= 0 || maxFrameSize == 
UnsignedInteger.MAX_VALUE.longValue()) {
-            return (int) DEFAULT_WINDOW_SIZE;
+        if (incomingCapacity <= 0 || maxFrameSize == 
UnsignedInteger.MAX_VALUE.intValue()) {
+            return DEFAULT_WINDOW_SIZE;
         } else {
-            return (int) (incomingCapacity - incomingBytes);
+            return incomingCapacity - incomingBytes;
         }
     }
 
@@ -93,7 +92,7 @@ public class ProtonSessionIncomingWindow {
      */
     Begin configureOutbound(Begin begin) {
         // Update as it might have changed if session created before 
connection open() called.
-        this.maxFrameSize = session.getConnection().getMaxFrameSize();
+        this.maxFrameSize = (int) session.getConnection().getMaxFrameSize();
 
         return begin.setIncomingWindow(updateIncomingWindow());
     }
@@ -108,7 +107,7 @@ public class ProtonSessionIncomingWindow {
      */
     Begin handleBegin(Begin begin) {
         if (begin.hasNextOutgoingId()) {
-            this.nextIncomingId = begin.getNextOutgoingId();
+            this.nextIncomingId = 
UnsignedInteger.valueOf(begin.getNextOutgoingId()).intValue();
         }
 
         return begin;
@@ -137,7 +136,8 @@ public class ProtonSessionIncomingWindow {
         incomingWindow--;
         nextIncomingId++;
 
-        ProtonIncomingDelivery delivery = link.remoteTransfer(transfer, 
payload);
+        final ProtonIncomingDelivery delivery = link.remoteTransfer(transfer, 
payload);
+
         if (!delivery.isSettled() && !delivery.isRemotelySettled() && 
delivery.isFirstTransfer()) {
             unsettled.put((int) delivery.getDeliveryId(), delivery);
         }
@@ -186,12 +186,10 @@ public class ProtonSessionIncomingWindow {
     }
 
     long updateIncomingWindow() {
-        // TODO - need to revisit this logic and decide on sane cutoff for 
capacity restriction.
         if (incomingCapacity <= 0 || maxFrameSize == 
UnsignedInteger.MAX_VALUE.longValue()) {
             incomingWindow = DEFAULT_WINDOW_SIZE;
         } else {
-            // TODO - incomingWindow = Integer.divideUnsigned(incomingCapacity 
- incomingBytes, maxFrameSize);
-            incomingWindow = (incomingCapacity - incomingBytes) / maxFrameSize;
+            incomingWindow = Integer.divideUnsigned(incomingCapacity - 
incomingBytes, maxFrameSize);
         }
 
         return incomingWindow;
@@ -205,14 +203,14 @@ public class ProtonSessionIncomingWindow {
     //----- Access to internal state useful for tests
 
     public long getIncomingBytes() {
-        return incomingBytes;
+        return Integer.toUnsignedLong(incomingBytes);
     }
 
-    public long getNextIncomingId() {
+    public int getNextIncomingId() {
         return nextIncomingId;
     }
 
-    public long getIncomingWindow() {
+    public int getIncomingWindow() {
         return incomingWindow;
     }
 
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
index 7b00f95b..56dcff16 100644
--- 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
@@ -4654,4 +4654,70 @@ public class ProtonReceiverTest extends 
ProtonEngineTestSupport {
         peer.waitForScriptToComplete();
         assertNull(failure);
     }
+
+    @Test
+    public void testReceiverUpdatesFlowOutgoingIdAfterOverflow() {
+        Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        final byte[] payload = new byte[] { 1 };
+
+        peer.expectAMQPHeader().respondWithAMQPHeader();
+        peer.expectOpen().respond().withContainerId("driver");
+        
peer.expectBegin().respond().withNextOutgoingId(UnsignedInteger.MAX_VALUE.intValue());
+        peer.expectAttach().respond();
+        peer.expectFlow().withLinkCredit(1)
+                         
.withNextIncomingId(UnsignedInteger.MAX_VALUE.intValue());
+        peer.remoteTransfer().withDeliveryId(1)
+                             .withDeliveryTag(new byte[] {1})
+                             .withMore(false)
+                             .withMessageFormat(0)
+                             .withPayload(payload).queue();
+        peer.expectDisposition().withFirst(1)
+                                .withSettled(true)
+                                .withState().accepted();
+
+        Connection connection = engine.start().open();
+        Session session = connection.session().open();
+        Receiver receiver = session.receiver("receiver");
+        receiver.deliveryReadHandler((delivery) -> {
+            delivery.disposition(Accepted.getInstance(), true);
+        });
+
+        receiver.addCredit(1);
+        receiver.open();
+
+        peer.waitForScriptToComplete();
+        peer.expectFlow().withLinkCredit(1)
+                         .withNextIncomingId(0);
+        peer.remoteTransfer().withDeliveryId(2)
+                             .withDeliveryTag(new byte[] {1})
+                             .withMore(false)
+                             .withMessageFormat(0)
+                             .withPayload(payload).queue();
+        peer.expectDisposition().withFirst(2)
+                                .withSettled(true)
+                                .withState().accepted();
+
+        receiver.addCredit(1);
+
+        peer.waitForScriptToComplete();
+        peer.expectFlow().withLinkCredit(1)
+                         .withNextIncomingId(1);
+
+        receiver.addCredit(1);
+
+        peer.expectDetach().respond();
+        peer.expectEnd().respond();
+        peer.expectClose().respond();
+
+        receiver.close();
+        session.close();
+        connection.close();
+
+        // Check post conditions and done.
+        peer.waitForScriptToComplete();
+        assertNull(failure);
+    }
 }
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindowTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindowTest.java
new file mode 100644
index 00000000..db530a5a
--- /dev/null
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindowTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.impl;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.protonj2.buffer.ProtonBuffer;
+import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator;
+import org.apache.qpid.protonj2.types.UnsignedInteger;
+import org.apache.qpid.protonj2.types.transport.Begin;
+import org.apache.qpid.protonj2.types.transport.Transfer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Test the proton incoming window class for specification compliant behavior
+ */
+@ExtendWith(MockitoExtension.class)
+class ProtonSessionIncomingWindowTest {
+
+    private static final int DEFAULT_SESSION_CAPACITY = 100_000;
+
+    private static final int DEFAULT_MAX_FRAME_SIZE = 100_000;
+
+    @Mock
+    ProtonConnection connection;
+
+    @Mock
+    ProtonSession session;
+
+    @Mock
+    ProtonReceiver receiver;
+
+    @Mock
+    ProtonIncomingDelivery delivery;
+
+    ProtonSessionIncomingWindow window;
+
+    final Begin remoteBegin = new Begin();
+    final Begin localBegin = new Begin();
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        when(session.getConnection()).thenReturn(connection);
+        when(session.getConnection().getMaxFrameSize()).thenReturn((long) 
DEFAULT_MAX_FRAME_SIZE);
+
+        window = new ProtonSessionIncomingWindow(session);
+    }
+
+    @Test
+    public void testStateAfterBasicBeginExchange() {
+        localBegin.setNextOutgoingId(0);
+        remoteBegin.setNextOutgoingId(UnsignedInteger.MAX_VALUE.intValue());
+
+        window.configureOutbound(localBegin);
+        window.handleBegin(remoteBegin);
+
+        assertEquals(UnsignedInteger.MAX_VALUE.intValue(), 
window.getNextIncomingId());
+        assertEquals(0, window.getIncomingCapacity());
+        assertEquals(Integer.MAX_VALUE, window.getRemainingIncomingCapacity());
+
+        window.setIncomingCapacity(DEFAULT_SESSION_CAPACITY);
+
+        assertEquals(DEFAULT_SESSION_CAPACITY, window.getIncomingCapacity());
+    }
+
+    @Test
+    public void testIncomingWindowUpdates() {
+        localBegin.setNextOutgoingId(0);
+        remoteBegin.setNextOutgoingId(0);
+
+        window.configureOutbound(localBegin);
+        assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+        window.handleBegin(remoteBegin);
+        assertEquals(Integer.MAX_VALUE, window.getIncomingWindow());
+
+        // Less than one frame worth of capacity
+        window.setIncomingCapacity(1);
+        window.configureOutbound(localBegin);
+        assertEquals(0, localBegin.getIncomingWindow());
+
+        // Exactly one frame worth of capacity
+        window.setIncomingCapacity(DEFAULT_MAX_FRAME_SIZE);
+        window.configureOutbound(localBegin);
+        assertEquals(1, localBegin.getIncomingWindow());
+
+        // Little more than one frame worth of capacity
+        window.setIncomingCapacity(DEFAULT_MAX_FRAME_SIZE + 10);
+        window.configureOutbound(localBegin);
+        assertEquals(1, localBegin.getIncomingWindow());
+
+        // Two frame worth of capacity
+        window.setIncomingCapacity(DEFAULT_MAX_FRAME_SIZE * 2);
+        window.configureOutbound(localBegin);
+        assertEquals(2, localBegin.getIncomingWindow());
+    }
+
+    @Test
+    public void testIncomingWindowUpdatesFrameSizeGreaterThanIntMaxVale() {
+        when(session.getConnection().getMaxFrameSize()).thenReturn((long) 
Integer.MAX_VALUE + 100);
+
+        window = new ProtonSessionIncomingWindow(session);
+
+        localBegin.setNextOutgoingId(0);
+        remoteBegin.setNextOutgoingId(0);
+
+        window.configureOutbound(localBegin);
+        assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+        window.handleBegin(remoteBegin);
+        assertEquals(Integer.MAX_VALUE, window.getIncomingWindow());
+
+        // Less than one frame worth of capacity
+        window.setIncomingCapacity(Integer.MAX_VALUE);
+        window.configureOutbound(localBegin);
+        assertEquals(0, localBegin.getIncomingWindow());
+
+        // Exactly one frame worth of capacity
+        window.setIncomingCapacity(Integer.MAX_VALUE + 100);
+        window.configureOutbound(localBegin);
+        assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+
+        // Little more than one frame worth of capacity
+        window.setIncomingCapacity(Integer.MAX_VALUE + 200);
+        window.configureOutbound(localBegin);
+        assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+
+        // Two frame worth of capacity
+        window.setIncomingCapacity(Integer.MAX_VALUE * 2);
+        window.configureOutbound(localBegin);
+        assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+    }
+
+    @Test
+    public void testTransferHandlingIncomingCapacityUpdates() {
+        localBegin.setNextOutgoingId(0);
+        remoteBegin.setNextOutgoingId(0);
+
+        window.setIncomingCapacity(DEFAULT_SESSION_CAPACITY);
+        window.configureOutbound(localBegin);
+        window.handleBegin(remoteBegin);
+
+        when(receiver.remoteTransfer(any(), any())).thenReturn(delivery);
+
+        assertEquals(0, window.getNextIncomingId());
+        assertEquals(DEFAULT_SESSION_CAPACITY, window.getIncomingCapacity());
+        assertEquals(DEFAULT_SESSION_CAPACITY, 
window.getRemainingIncomingCapacity());
+
+        final Transfer transfer = new Transfer();
+        final ProtonBuffer payload = 
ProtonBufferAllocator.defaultAllocator().allocateHeapBuffer(10).setWriteOffset(10);
+
+        window.handleTransfer(receiver, transfer, payload);
+
+        assertEquals(DEFAULT_SESSION_CAPACITY - payload.getReadableBytes(), 
window.getRemainingIncomingCapacity());
+
+        window.deliveryRead(delivery, payload.getReadableBytes());
+
+        assertEquals(DEFAULT_SESSION_CAPACITY, window.getIncomingCapacity());
+    }
+
+    @Test
+    public void testNextIncomingIdStartingAtZero() {
+        localBegin.setNextOutgoingId(0);
+        remoteBegin.setNextOutgoingId(0);
+
+        window.setIncomingCapacity(DEFAULT_SESSION_CAPACITY * 3);
+        window.configureOutbound(localBegin);
+        window.handleBegin(remoteBegin);
+
+        when(receiver.remoteTransfer(any(), any())).thenReturn(delivery);
+
+        final Transfer transfer = new Transfer();
+        final ProtonBuffer payload = 
ProtonBufferAllocator.defaultAllocator().allocateHeapBuffer();
+
+        payload.writeInt(255);
+
+        assertEquals(0, window.getNextIncomingId());
+        assertEquals(3, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(1, window.getNextIncomingId());
+        assertEquals(2, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(2, window.getNextIncomingId());
+        assertEquals(1, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(3, window.getNextIncomingId());
+        assertEquals(0, window.getIncomingWindow());
+    }
+
+    @Test
+    public void testNextIncomingIdStartingAtMaxUInt() {
+        localBegin.setNextOutgoingId(0);
+        remoteBegin.setNextOutgoingId(UnsignedInteger.MAX_VALUE.intValue());
+
+        window.setIncomingCapacity(DEFAULT_SESSION_CAPACITY * 20);
+        window.configureOutbound(localBegin);
+        window.handleBegin(remoteBegin);
+
+        when(receiver.remoteTransfer(any(), any())).thenReturn(delivery);
+
+        final Transfer transfer = new Transfer();
+        final ProtonBuffer payload = 
ProtonBufferAllocator.defaultAllocator().allocateHeapBuffer();
+
+        payload.writeLong(32767);
+
+        assertEquals(UnsignedInteger.MAX_VALUE.intValue(), 
window.getNextIncomingId());
+        assertEquals(20, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(0, window.getNextIncomingId());
+        assertEquals(19, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(1, window.getNextIncomingId());
+        assertEquals(18, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(2, window.getNextIncomingId());
+        assertEquals(17, window.getIncomingWindow());
+    }
+
+    @Test
+    public void testIncomingWindowSetToUIntMaxValueCapsMaxToIntMaxValue() {
+        localBegin.setNextOutgoingId(0);
+        remoteBegin.setNextOutgoingId(Integer.MAX_VALUE);
+
+        window.setIncomingCapacity(UnsignedInteger.MAX_VALUE.intValue());
+        window.configureOutbound(localBegin);
+        window.handleBegin(remoteBegin);
+
+        when(receiver.remoteTransfer(any(), any())).thenReturn(delivery);
+
+        final Transfer transfer = new Transfer();
+        final ProtonBuffer payload = 
ProtonBufferAllocator.defaultAllocator().allocateHeapBuffer();
+
+        payload.writeBoolean(false);
+
+        assertEquals(Integer.MAX_VALUE, window.getNextIncomingId());
+        assertEquals(Integer.MAX_VALUE, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(Integer.MAX_VALUE + 1, window.getNextIncomingId());
+        assertEquals(Integer.MAX_VALUE - 1, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(Integer.MAX_VALUE + 2, window.getNextIncomingId());
+        assertEquals(Integer.MAX_VALUE - 2, window.getIncomingWindow());
+        window.handleTransfer(receiver, transfer, payload);
+        assertEquals(Integer.MAX_VALUE + 3, window.getNextIncomingId());
+        assertEquals(Integer.MAX_VALUE - 3, window.getIncomingWindow());
+    }
+}
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindowTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindowTest.java
new file mode 100644
index 00000000..1619c583
--- /dev/null
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindowTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.impl;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.protonj2.types.transport.Begin;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Test the proton outgoing window class for specification compliant behavior
+ */
+@ExtendWith(MockitoExtension.class)
+class ProtonSessionOutgoingWindowTest {
+
+    private static final int DEFAULT_MAX_FRAME_SIZE = 100_000;
+
+    @Mock
+    ProtonEngineConfiguration configuration;
+
+    @Mock
+    ProtonEngine engine;
+
+    @Mock
+    ProtonConnection connection;
+
+    @Mock
+    ProtonSession session;
+
+    @Mock
+    ProtonOutgoingDelivery delivery;
+
+    ProtonSessionOutgoingWindow window;
+
+    final Begin remoteBegin = new Begin();
+    final Begin localBegin = new Begin();
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        when(configuration.getOutboundMaxFrameSize()).thenReturn((long) 
DEFAULT_MAX_FRAME_SIZE);
+        when(engine.configuration()).thenReturn(configuration);
+        when(session.getEngine()).thenReturn(engine);
+        when(session.getConnection()).thenReturn(connection);
+
+        window = new ProtonSessionOutgoingWindow(session);
+    }
+
+    @Test
+    public void testConfigureOutbound() {
+        window.configureOutbound(localBegin);
+        window.handleBegin(remoteBegin);
+
+        assertEquals(Integer.MAX_VALUE, localBegin.getOutgoingWindow());
+        assertEquals(0, localBegin.getNextOutgoingId());
+
+        assertFalse(window.isSendable());
+        assertEquals(0, window.getNextOutgoingId());
+        assertEquals(-1, window.getOutgoingCapacity()); // No limit set
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to