This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new f2159293 PROTON-2618 Better handle remote forced close vs connection drop f2159293 is described below commit f215929395e44a8a0679befe15f96fd95b634c16 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Wed Oct 5 17:42:15 2022 -0400 PROTON-2618 Better handle remote forced close vs connection drop Ensure that the receiver doesn't react to remote force close of the connection if the connection will reconnect to avoid receive calls unblocking with error when the underlying connection is reconnecting. --- .../protonj2/client/impl/ClientConnection.java | 2 + .../qpid/protonj2/client/impl/ClientLinkType.java | 7 +- .../client/impl/ReconnectReceiverTest.java | 70 ++++++++++ .../protonj2/client/impl/ReconnectSenderTest.java | 150 ++++++++++++++++++++- .../protonj2/client/impl/StreamReceiverTest.java | 68 ++++++++++ 5 files changed, 293 insertions(+), 4 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java index a1f2dcca..a1b0ce20 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java @@ -639,6 +639,8 @@ public final class ClientConnection implements Connection { } private void handleLocalClose(org.apache.qpid.protonj2.engine.Connection connection) { + // Don't react if engine was shutdown and parent closed as a result instead wait to get the + // shutdown notification and respond to that change. if (connection.isRemotelyClosed()) { final ClientException failureCause; diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java index 8907a8f1..62552f18 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java @@ -369,9 +369,10 @@ public abstract class ClientLinkType<LinkType extends Link<LinkType>, } protected final void handleParentEndpointClosed(ProtonType link) { - // Don't react if engine was shutdown and parent closed as a result instead wait to get the - // shutdown notification and respond to that change. - if (link.getEngine().isRunning()) { + // This handle is only for the case that the parent session was remotely or locally + // closed. In all other cases we want to allow natural engine shutdown handling to + // trigger shutdown as we can check there if the parent is reconnecting or not. + if (link.getEngine().isRunning() && !link.getConnection().isLocallyClosed()) { final ClientException failureCause; if (link.getConnection().getRemoteCondition() != null) { diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java index e3f0db95..b10df8f9 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java @@ -33,6 +33,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosed import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase; import org.apache.qpid.protonj2.test.driver.ProtonTestServer; import org.apache.qpid.protonj2.types.messaging.AmqpValue; +import org.apache.qpid.protonj2.types.transport.ConnectionError; import org.apache.qpid.protonj2.types.transport.Role; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -236,4 +237,73 @@ class ReconnectReceiverTest extends ImperativeClientTestCase { assertNotNull(delivery); } } + + @Test + public void testReceiverWaitsWhenConnectionForcedDisconnect() throws Exception { + final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World")); + + try (ProtonTestServer firstPeer = new ProtonTestServer(); + ProtonTestServer finalPeer = new ProtonTestServer()) { + + firstPeer.expectSASLAnonymousConnect(); + firstPeer.expectOpen().respond(); + firstPeer.expectBegin().respond(); + firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond(); + firstPeer.expectFlow().withLinkCredit(10); + firstPeer.remoteClose() + .withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced disconnect").queue().afterDelay(20); + firstPeer.expectClose(); + firstPeer.start(); + + finalPeer.expectSASLAnonymousConnect(); + finalPeer.expectOpen().respond(); + finalPeer.expectBegin().respond(); + finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond(); + finalPeer.expectFlow().withLinkCredit(10); + finalPeer.remoteTransfer().withHandle(0) + .withDeliveryId(0) + .withDeliveryTag(new byte[] { 1 }) + .withMore(false) + .withSettled(true) + .withMessageFormat(0) + .withPayload(payload).queue().afterDelay(5); + finalPeer.start(); + + final URI primaryURI = firstPeer.getServerURI(); + final URI backupURI = finalPeer.getServerURI(); + + ConnectionOptions options = new ConnectionOptions(); + options.reconnectOptions().reconnectEnabled(true); + options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort()); + + Client container = Client.create(); + Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options); + Session session = connection.openSession(); + ReceiverOptions rcvOpts = new ReceiverOptions().autoAccept(false); + Receiver receiver = session.openReceiver("test-queue", rcvOpts); + + Delivery delivery = null; + try { + delivery = receiver.receive(10, TimeUnit.SECONDS); + } catch (Exception ex) { + fail("Should not have failed on blocking receive call." + ex.getMessage()); + } + + assertNotNull(delivery); + + firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS); + finalPeer.waitForScriptToComplete(); + finalPeer.expectDetach().respond(); + finalPeer.expectEnd().respond(); + finalPeer.expectClose().respond(); + + delivery.accept(); + + receiver.close(); + session.close(); + connection.close(); + + assertNotNull(delivery); + } + } } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java index 3aa88a8c..24711f16 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java @@ -43,6 +43,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosed import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase; import org.apache.qpid.protonj2.test.driver.ProtonTestServer; +import org.apache.qpid.protonj2.types.transport.ConnectionError; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -161,6 +162,153 @@ class ReconnectSenderTest extends ImperativeClientTestCase { } } + @Test + public void testInFlightSendFailedAfterConnectionForcedCloseAndNotResent() throws Exception { + try (ProtonTestServer firstPeer = new ProtonTestServer(); + ProtonTestServer finalPeer = new ProtonTestServer()) { + + firstPeer.expectSASLAnonymousConnect(); + firstPeer.expectOpen().respond(); + firstPeer.expectBegin().respond(); + firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond(); + firstPeer.remoteFlow().withLinkCredit(1).queue(); + firstPeer.expectTransfer().withNonNullPayload(); + firstPeer.remoteClose() + .withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced disconnect").queue().afterDelay(20); + firstPeer.expectClose(); + firstPeer.start(); + + finalPeer.expectSASLAnonymousConnect(); + finalPeer.expectOpen().respond(); + finalPeer.expectBegin().respond(); + finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond(); + finalPeer.start(); + + final URI primaryURI = firstPeer.getServerURI(); + final URI backupURI = finalPeer.getServerURI(); + + ConnectionOptions options = new ConnectionOptions(); + options.reconnectOptions().reconnectEnabled(true); + options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort()); + + Client container = Client.create(); + Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options); + Session session = connection.openSession(); + Sender sender = session.openSender("test"); + + final AtomicReference<Tracker> tracker = new AtomicReference<>(); + final AtomicReference<ClientException> error = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + + ForkJoinPool.commonPool().execute(() -> { + try { + tracker.set(sender.send(Message.create("Hello"))); + } catch (ClientException e) { + error.set(e); + } finally { + latch.countDown(); + } + }); + + firstPeer.waitForScriptToComplete(); + finalPeer.waitForScriptToComplete(); + finalPeer.expectDetach().withClosed(true).respond(); + finalPeer.expectEnd().respond(); + finalPeer.expectClose().respond(); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have failed previously sent message"); + assertNotNull(tracker.get()); + assertNull(error.get()); + assertThrows(ClientConnectionRemotelyClosedException.class, () -> tracker.get().awaitSettlement()); + + sender.close(); + session.close(); + connection.close(); + + finalPeer.waitForScriptToComplete(); + } + } + + @Test + public void testSendBlockedOnCreditGetsSentAfterReconnectFromForcedCloseAndCreditGranted() throws Exception { + try (ProtonTestServer firstPeer = new ProtonTestServer(); + ProtonTestServer finalPeer = new ProtonTestServer()) { + + firstPeer.expectSASLAnonymousConnect(); + firstPeer.expectOpen().respond(); + firstPeer.expectBegin().respond(); + firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond(); + firstPeer.remoteClose() + .withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced disconnect").queue().afterDelay(20); + firstPeer.expectClose(); + firstPeer.start(); + + finalPeer.expectSASLAnonymousConnect(); + finalPeer.expectOpen().respond(); + finalPeer.expectBegin().respond(); + finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond(); + finalPeer.start(); + + final URI primaryURI = firstPeer.getServerURI(); + final URI backupURI = finalPeer.getServerURI(); + + ConnectionOptions options = new ConnectionOptions(); + options.reconnectOptions().reconnectEnabled(true); + options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort()); + + Client container = Client.create(); + Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options); + Session session = connection.openSession(); + Sender sender = session.openSender("test"); + + final AtomicReference<Tracker> tracker = new AtomicReference<>(); + final AtomicReference<Exception> sendError = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + + ForkJoinPool.commonPool().execute(() -> { + try { + tracker.set(sender.send(Message.create("Hello"))); + } catch (ClientException e) { + sendError.set(e); + } finally { + latch.countDown(); + } + }); + + firstPeer.waitForScriptToComplete(); + finalPeer.waitForScriptToComplete(); + finalPeer.expectTransfer().withNonNullPayload() + .respond() + .withSettled(true).withState().accepted(); + finalPeer.expectDetach().withClosed(true).respond(); + finalPeer.expectEnd().respond(); + finalPeer.expectClose().respond(); + + // Grant credit now and await expected message send. + finalPeer.remoteFlow().withDeliveryCount(0) + .withLinkCredit(10) + .withIncomingWindow(10) + .withOutgoingWindow(10) + .withNextIncomingId(0) + .withNextOutgoingId(1).now(); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have sent blocked message"); + assertNull(sendError.get()); + assertNotNull(tracker.get()); + + Tracker send = tracker.get(); + assertSame(tracker.get(), send.awaitSettlement(10, TimeUnit.SECONDS)); + assertTrue(send.remoteSettled()); + assertEquals(DeliveryState.accepted(), send.remoteState()); + + sender.close(); + session.close(); + connection.close(); + + finalPeer.waitForScriptToComplete(); + } + } + @Test public void testSendBlockedOnCreditGetsSentAfterReconnectAndCreditGranted() throws Exception { try (ProtonTestServer firstPeer = new ProtonTestServer(); @@ -297,7 +445,7 @@ class ReconnectSenderTest extends ImperativeClientTestCase { @Test public void testMultipleSenderCreationRecoversAfterDropWithNoAttachResponse() throws Exception { try (ProtonTestServer firstPeer = new ProtonTestServer(); - ProtonTestServer intermediatePeer = new ProtonTestServer(); + ProtonTestServer intermediatePeer = new ProtonTestServer(); ProtonTestServer finalPeer = new ProtonTestServer()) { firstPeer.expectSASLAnonymousConnect(); diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java index 2ca78f0a..00d1e60e 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java @@ -78,6 +78,7 @@ import org.apache.qpid.protonj2.types.messaging.Footer; import org.apache.qpid.protonj2.types.messaging.Header; import org.apache.qpid.protonj2.types.messaging.MessageAnnotations; import org.apache.qpid.protonj2.types.messaging.Properties; +import org.apache.qpid.protonj2.types.transport.ConnectionError; import org.apache.qpid.protonj2.types.transport.Role; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -3860,6 +3861,73 @@ class StreamReceiverTest extends ImperativeClientTestCase { } } + @Test + public void testReceiverWaitsWhenConnectionForcedDisconnect() throws Exception { + final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World")); + + try (ProtonTestServer firstPeer = new ProtonTestServer(); + ProtonTestServer finalPeer = new ProtonTestServer()) { + + firstPeer.expectSASLAnonymousConnect(); + firstPeer.expectOpen().respond(); + firstPeer.expectBegin().respond(); + firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond(); + firstPeer.expectFlow().withLinkCredit(10); + firstPeer.remoteClose() + .withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced disconnect").queue().afterDelay(20); + firstPeer.expectClose(); + firstPeer.start(); + + finalPeer.expectSASLAnonymousConnect(); + finalPeer.expectOpen().respond(); + finalPeer.expectBegin().respond(); + finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond(); + finalPeer.expectFlow().withLinkCredit(10); + finalPeer.remoteTransfer().withHandle(0) + .withDeliveryId(0) + .withDeliveryTag(new byte[] { 1 }) + .withMore(false) + .withSettled(true) + .withMessageFormat(0) + .withPayload(payload).queue().afterDelay(5); + finalPeer.start(); + + final URI primaryURI = firstPeer.getServerURI(); + final URI backupURI = finalPeer.getServerURI(); + + ConnectionOptions options = new ConnectionOptions(); + options.reconnectOptions().reconnectEnabled(true); + options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort()); + + Client container = Client.create(); + Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options); + StreamReceiverOptions rcvOpts = new StreamReceiverOptions().autoAccept(false); + StreamReceiver receiver = connection.openStreamReceiver("test-receiver", rcvOpts); + + StreamDelivery delivery = null; + try { + delivery = receiver.receive(10, TimeUnit.SECONDS); + } catch (Exception ex) { + fail("Should not have failed on blocking receive call." + ex.getMessage()); + } + + assertNotNull(delivery); + + firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS); + finalPeer.waitForScriptToComplete(); + finalPeer.expectDetach().respond(); + finalPeer.expectEnd().respond(); + finalPeer.expectClose().respond(); + + delivery.accept(); + + receiver.close(); + connection.close(); + + assertNotNull(delivery); + } + } + private byte[] createInvalidHeaderEncoding() { final byte[] buffer = new byte[12]; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org