This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new e662d7fd PROTON-2850 Fix some sporadic test failures that appear in CI
e662d7fd is described below

commit e662d7fd928a1558d875a88485ad6b7162da9175
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Sep 16 15:51:27 2024 -0400

    PROTON-2850 Fix some sporadic test failures that appear in CI
---
 .../apache/qpid/protonj2/client/impl/ReceiverTest.java | 15 ++++++++++++---
 .../qpid/protonj2/client/impl/StreamSenderTest.java    | 18 +++++++++---------
 2 files changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 3bcc3651..e630f484 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -2458,15 +2459,20 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
                                  .withMore(false)
                                  .withMessageFormat(0)
                                  .withPayload(payload).queue();
-            peer.dropAfterLastHandler();
+            peer.expectDisposition().optional();
+            peer.dropAfterLastHandler(1);
             peer.start();
 
             URI remoteURI = peer.getServerURI();
 
             LOG.info("Test started, peer listening on: {}", remoteURI);
 
+            final CountDownLatch disconnected = new CountDownLatch(1);
+
             Client container = Client.create();
-            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            ConnectionOptions options = new ConnectionOptions();
+            options.disconnectedHandler((c, e) -> disconnected.countDown());
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), options);
             final Receiver receiver = connection.openReceiver("test-queue");
             final Delivery delivery = receiver.receive();
 
@@ -2474,7 +2480,10 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
 
             assertNotNull(delivery);
 
-            // Data already read so it will be already available for read.
+            assertTrue(disconnected.await(5, TimeUnit.SECONDS));
+
+            // Data already read so it will be already available for read but 
should be unable to
+            // since the connection has dropped.
             assertNotEquals(-1, delivery.rawInputStream().read());
 
             connection.close();
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index ab69063c..3a0b31ca 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -297,7 +297,6 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
         }
     }
 
-    @SuppressWarnings("resource")
     @Test
     public void testSendCustomMessageWithMultipleAmqpValueSections() throws 
Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -397,7 +396,6 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
         }
     }
 
-    @SuppressWarnings("resource")
     @Test
     public void testClearBodySectionsIsNoOpForStreamSenderMessage() throws 
Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1478,7 +1476,6 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
         }
     }
 
-    @SuppressWarnings("resource")
     @Test
     void testRawOutputStreamFromMessageWritesUnmodifiedBytes() throws 
Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1677,7 +1674,6 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
         }
     }
 
-    @SuppressWarnings("resource")
     @Test
     public void testStreamSenderWritesFooterAfterStreamClosed() throws 
Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1893,7 +1889,6 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
             });
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
             peer.expectTransfer().withNonNullPayload().withMore(true);
             
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
             peer.expectTransfer().withNonNullPayload().withMore(true);
@@ -1904,6 +1899,9 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
             
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(5).withLinkCredit(10).queue();
             
peer.expectTransfer().withNonNullPayload().withMore(false).accept();
 
+            // Initiate message flow by now granting the first credit
+            
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
+
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectDetach().respond();
             peer.expectEnd().respond();
@@ -2037,11 +2035,13 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
             });
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
             
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
             
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(2).withLinkCredit(1).queue();
             
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
 
+            // Initiate message flow by now granting the first credit
+            
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
+
             assertTrue(send2Completed.await(10, TimeUnit.SECONDS));
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
@@ -2108,7 +2108,6 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
             });
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
             peer.expectTransfer().withNonNullPayload().withMore(true);
             
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(2).withLinkCredit(1).queue();
             
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
@@ -2117,6 +2116,9 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
             
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(4).withLinkCredit(1).queue();
             
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
 
+            // Initiate message flow by now granting the first credits
+            
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
+
             assertTrue(send2Completed.await(10, TimeUnit.SECONDS));
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
@@ -2133,7 +2135,6 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
         }
     }
 
-    @SuppressWarnings("resource")
     @Test
     void testMessageSendWhileStreamSendIsOpenShouldBlock() throws Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -2199,7 +2200,6 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
         }
     }
 
-    @SuppressWarnings("resource")
     @Test
     public void testStreamSenderSessionCannotCreateNewResources() throws 
Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to