This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new e662d7fd PROTON-2850 Fix some sporadic test failures that appear in CI
e662d7fd is described below
commit e662d7fd928a1558d875a88485ad6b7162da9175
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Sep 16 15:51:27 2024 -0400
PROTON-2850 Fix some sporadic test failures that appear in CI
---
.../apache/qpid/protonj2/client/impl/ReceiverTest.java | 15 ++++++++++++---
.../qpid/protonj2/client/impl/StreamSenderTest.java | 18 +++++++++---------
2 files changed, 21 insertions(+), 12 deletions(-)
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 3bcc3651..e630f484 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -2458,15 +2459,20 @@ public class ReceiverTest extends
ImperativeClientTestCase {
.withMore(false)
.withMessageFormat(0)
.withPayload(payload).queue();
- peer.dropAfterLastHandler();
+ peer.expectDisposition().optional();
+ peer.dropAfterLastHandler(1);
peer.start();
URI remoteURI = peer.getServerURI();
LOG.info("Test started, peer listening on: {}", remoteURI);
+ final CountDownLatch disconnected = new CountDownLatch(1);
+
Client container = Client.create();
- Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort());
+ ConnectionOptions options = new ConnectionOptions();
+ options.disconnectedHandler((c, e) -> disconnected.countDown());
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort(), options);
final Receiver receiver = connection.openReceiver("test-queue");
final Delivery delivery = receiver.receive();
@@ -2474,7 +2480,10 @@ public class ReceiverTest extends
ImperativeClientTestCase {
assertNotNull(delivery);
- // Data already read so it will be already available for read.
+ assertTrue(disconnected.await(5, TimeUnit.SECONDS));
+
+ // Data already read so it will be already available for read but
should be unable to
+ // since the connection has dropped.
assertNotEquals(-1, delivery.rawInputStream().read());
connection.close();
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index ab69063c..3a0b31ca 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -297,7 +297,6 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
}
}
- @SuppressWarnings("resource")
@Test
public void testSendCustomMessageWithMultipleAmqpValueSections() throws
Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -397,7 +396,6 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
}
}
- @SuppressWarnings("resource")
@Test
public void testClearBodySectionsIsNoOpForStreamSenderMessage() throws
Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1478,7 +1476,6 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
}
}
- @SuppressWarnings("resource")
@Test
void testRawOutputStreamFromMessageWritesUnmodifiedBytes() throws
Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1677,7 +1674,6 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
}
}
- @SuppressWarnings("resource")
@Test
public void testStreamSenderWritesFooterAfterStreamClosed() throws
Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1893,7 +1889,6 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(2).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(true);
@@ -1904,6 +1899,9 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(5).withLinkCredit(10).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).accept();
+ // Initiate message flow by now granting the first credit
+
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(1).withLinkCredit(10).now();
+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach().respond();
peer.expectEnd().respond();
@@ -2037,11 +2035,13 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(2).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
+ // Initiate message flow by now granting the first credit
+
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
+
assertTrue(send2Completed.await(10, TimeUnit.SECONDS));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
@@ -2108,7 +2108,6 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
});
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
peer.expectTransfer().withNonNullPayload().withMore(true);
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(2).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
@@ -2117,6 +2116,9 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(1).withNextIncomingId(4).withLinkCredit(1).queue();
peer.expectTransfer().withNonNullPayload().withMore(false).respond().withSettled(true).withState().accepted();
+ // Initiate message flow by now granting the first credits
+
peer.remoteFlow().withIncomingWindow(1).withDeliveryCount(0).withNextIncomingId(1).withLinkCredit(1).now();
+
assertTrue(send2Completed.await(10, TimeUnit.SECONDS));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
@@ -2133,7 +2135,6 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
}
}
- @SuppressWarnings("resource")
@Test
void testMessageSendWhileStreamSendIsOpenShouldBlock() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -2199,7 +2200,6 @@ public class StreamSenderTest extends
ImperativeClientTestCase {
}
}
- @SuppressWarnings("resource")
@Test
public void testStreamSenderSessionCannotCreateNewResources() throws
Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]