This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new f2159293 PROTON-2618 Better handle remote forced close vs connection 
drop
f2159293 is described below

commit f215929395e44a8a0679befe15f96fd95b634c16
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Wed Oct 5 17:42:15 2022 -0400

    PROTON-2618 Better handle remote forced close vs connection drop
    
    Ensure that the receiver doesn't react to remote force close of the
    connection if the connection will reconnect to avoid receive calls
    unblocking with error when the underlying connection is reconnecting.
---
 .../protonj2/client/impl/ClientConnection.java     |   2 +
 .../qpid/protonj2/client/impl/ClientLinkType.java  |   7 +-
 .../client/impl/ReconnectReceiverTest.java         |  70 ++++++++++
 .../protonj2/client/impl/ReconnectSenderTest.java  | 150 ++++++++++++++++++++-
 .../protonj2/client/impl/StreamReceiverTest.java   |  68 ++++++++++
 5 files changed, 293 insertions(+), 4 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index a1f2dcca..a1b0ce20 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -639,6 +639,8 @@ public final class ClientConnection implements Connection {
     }
 
     private void handleLocalClose(org.apache.qpid.protonj2.engine.Connection 
connection) {
+        // Don't react if engine was shutdown and parent closed as a result 
instead wait to get the
+        // shutdown notification and respond to that change.
         if (connection.isRemotelyClosed()) {
             final ClientException failureCause;
 
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
index 8907a8f1..62552f18 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
@@ -369,9 +369,10 @@ public abstract class ClientLinkType<LinkType extends 
Link<LinkType>,
     }
 
     protected final void handleParentEndpointClosed(ProtonType link) {
-        // Don't react if engine was shutdown and parent closed as a result 
instead wait to get the
-        // shutdown notification and respond to that change.
-        if (link.getEngine().isRunning()) {
+        // This handle is only for the case that the parent session was 
remotely or locally
+        // closed. In all other cases we want to allow natural engine shutdown 
handling to
+        // trigger shutdown as we can check there if the parent is 
reconnecting or not.
+        if (link.getEngine().isRunning() && 
!link.getConnection().isLocallyClosed()) {
             final ClientException failureCause;
 
             if (link.getConnection().getRemoteCondition() != null) {
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java
index e3f0db95..b10df8f9 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java
@@ -33,6 +33,7 @@ import 
org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosed
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.types.messaging.AmqpValue;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
 import org.apache.qpid.protonj2.types.transport.Role;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -236,4 +237,73 @@ class ReconnectReceiverTest extends 
ImperativeClientTestCase {
             assertNotNull(delivery);
         }
     }
+
+    @Test
+    public void testReceiverWaitsWhenConnectionForcedDisconnect() throws 
Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World"));
+
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            
firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+            firstPeer.expectFlow().withLinkCredit(10);
+            firstPeer.remoteClose()
+                     
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced 
disconnect").queue().afterDelay(20);
+            firstPeer.expectClose();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            
finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+            finalPeer.expectFlow().withLinkCredit(10);
+            finalPeer.remoteTransfer().withHandle(0)
+                                      .withDeliveryId(0)
+                                      .withDeliveryTag(new byte[] { 1 })
+                                      .withMore(false)
+                                      .withSettled(true)
+                                      .withMessageFormat(0)
+                                      
.withPayload(payload).queue().afterDelay(5);
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.reconnectOptions().reconnectEnabled(true);
+            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+            Session session = connection.openSession();
+            ReceiverOptions rcvOpts = new ReceiverOptions().autoAccept(false);
+            Receiver receiver = session.openReceiver("test-queue", rcvOpts);
+
+            Delivery delivery = null;
+            try {
+                delivery = receiver.receive(10, TimeUnit.SECONDS);
+            } catch (Exception ex) {
+                fail("Should not have failed on blocking receive call." + 
ex.getMessage());
+            }
+
+            assertNotNull(delivery);
+
+            firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            finalPeer.waitForScriptToComplete();
+            finalPeer.expectDetach().respond();
+            finalPeer.expectEnd().respond();
+            finalPeer.expectClose().respond();
+
+            delivery.accept();
+
+            receiver.close();
+            session.close();
+            connection.close();
+
+            assertNotNull(delivery);
+        }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
index 3aa88a8c..24711f16 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
@@ -43,6 +43,7 @@ import 
org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosed
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -161,6 +162,153 @@ class ReconnectSenderTest extends 
ImperativeClientTestCase {
        }
     }
 
+    @Test
+    public void testInFlightSendFailedAfterConnectionForcedCloseAndNotResent() 
throws Exception {
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+           firstPeer.expectSASLAnonymousConnect();
+           firstPeer.expectOpen().respond();
+           firstPeer.expectBegin().respond();
+           
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+           firstPeer.remoteFlow().withLinkCredit(1).queue();
+           firstPeer.expectTransfer().withNonNullPayload();
+           firstPeer.remoteClose()
+                    
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced 
disconnect").queue().afterDelay(20);
+           firstPeer.expectClose();
+           firstPeer.start();
+
+           finalPeer.expectSASLAnonymousConnect();
+           finalPeer.expectOpen().respond();
+           finalPeer.expectBegin().respond();
+           
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+           finalPeer.start();
+
+           final URI primaryURI = firstPeer.getServerURI();
+           final URI backupURI = finalPeer.getServerURI();
+
+           ConnectionOptions options = new ConnectionOptions();
+           options.reconnectOptions().reconnectEnabled(true);
+           
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+           Client container = Client.create();
+           Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+           Session session = connection.openSession();
+           Sender sender = session.openSender("test");
+
+           final AtomicReference<Tracker> tracker = new AtomicReference<>();
+           final AtomicReference<ClientException> error = new 
AtomicReference<>();
+           final CountDownLatch latch = new CountDownLatch(1);
+
+           ForkJoinPool.commonPool().execute(() -> {
+               try {
+                   tracker.set(sender.send(Message.create("Hello")));
+               } catch (ClientException e) {
+                   error.set(e);
+               } finally {
+                   latch.countDown();
+               }
+           });
+
+           firstPeer.waitForScriptToComplete();
+           finalPeer.waitForScriptToComplete();
+           finalPeer.expectDetach().withClosed(true).respond();
+           finalPeer.expectEnd().respond();
+           finalPeer.expectClose().respond();
+
+           assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have failed 
previously sent message");
+           assertNotNull(tracker.get());
+           assertNull(error.get());
+           assertThrows(ClientConnectionRemotelyClosedException.class, () -> 
tracker.get().awaitSettlement());
+
+           sender.close();
+           session.close();
+           connection.close();
+
+           finalPeer.waitForScriptToComplete();
+       }
+    }
+
+    @Test
+    public void 
testSendBlockedOnCreditGetsSentAfterReconnectFromForcedCloseAndCreditGranted() 
throws Exception {
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+           firstPeer.expectSASLAnonymousConnect();
+           firstPeer.expectOpen().respond();
+           firstPeer.expectBegin().respond();
+           
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+           firstPeer.remoteClose()
+                    
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced 
disconnect").queue().afterDelay(20);
+           firstPeer.expectClose();
+           firstPeer.start();
+
+           finalPeer.expectSASLAnonymousConnect();
+           finalPeer.expectOpen().respond();
+           finalPeer.expectBegin().respond();
+           
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+           finalPeer.start();
+
+           final URI primaryURI = firstPeer.getServerURI();
+           final URI backupURI = finalPeer.getServerURI();
+
+           ConnectionOptions options = new ConnectionOptions();
+           options.reconnectOptions().reconnectEnabled(true);
+           
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+           Client container = Client.create();
+           Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+           Session session = connection.openSession();
+           Sender sender = session.openSender("test");
+
+           final AtomicReference<Tracker> tracker = new AtomicReference<>();
+           final AtomicReference<Exception> sendError = new 
AtomicReference<>();
+           final CountDownLatch latch = new CountDownLatch(1);
+
+           ForkJoinPool.commonPool().execute(() -> {
+               try {
+                   tracker.set(sender.send(Message.create("Hello")));
+               } catch (ClientException e) {
+                   sendError.set(e);
+               } finally {
+                   latch.countDown();
+               }
+           });
+
+           firstPeer.waitForScriptToComplete();
+           finalPeer.waitForScriptToComplete();
+           finalPeer.expectTransfer().withNonNullPayload()
+                                     .respond()
+                                     .withSettled(true).withState().accepted();
+           finalPeer.expectDetach().withClosed(true).respond();
+           finalPeer.expectEnd().respond();
+           finalPeer.expectClose().respond();
+
+           // Grant credit now and await expected message send.
+           finalPeer.remoteFlow().withDeliveryCount(0)
+                                 .withLinkCredit(10)
+                                 .withIncomingWindow(10)
+                                 .withOutgoingWindow(10)
+                                 .withNextIncomingId(0)
+                                 .withNextOutgoingId(1).now();
+
+           assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have sent 
blocked message");
+           assertNull(sendError.get());
+           assertNotNull(tracker.get());
+
+           Tracker send = tracker.get();
+           assertSame(tracker.get(), send.awaitSettlement(10, 
TimeUnit.SECONDS));
+           assertTrue(send.remoteSettled());
+           assertEquals(DeliveryState.accepted(), send.remoteState());
+
+           sender.close();
+           session.close();
+           connection.close();
+
+           finalPeer.waitForScriptToComplete();
+       }
+    }
+
     @Test
     public void 
testSendBlockedOnCreditGetsSentAfterReconnectAndCreditGranted() throws 
Exception {
         try (ProtonTestServer firstPeer = new ProtonTestServer();
@@ -297,7 +445,7 @@ class ReconnectSenderTest extends ImperativeClientTestCase {
     @Test
     public void 
testMultipleSenderCreationRecoversAfterDropWithNoAttachResponse() throws 
Exception {
         try (ProtonTestServer firstPeer = new ProtonTestServer();
-                ProtonTestServer intermediatePeer = new ProtonTestServer();
+             ProtonTestServer intermediatePeer = new ProtonTestServer();
              ProtonTestServer finalPeer = new ProtonTestServer()) {
 
             firstPeer.expectSASLAnonymousConnect();
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 2ca78f0a..00d1e60e 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -78,6 +78,7 @@ import org.apache.qpid.protonj2.types.messaging.Footer;
 import org.apache.qpid.protonj2.types.messaging.Header;
 import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
 import org.apache.qpid.protonj2.types.messaging.Properties;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
 import org.apache.qpid.protonj2.types.transport.Role;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -3860,6 +3861,73 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
        }
     }
 
+    @Test
+    public void testReceiverWaitsWhenConnectionForcedDisconnect() throws 
Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World"));
+
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            
firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+            firstPeer.expectFlow().withLinkCredit(10);
+            firstPeer.remoteClose()
+                     
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced 
disconnect").queue().afterDelay(20);
+            firstPeer.expectClose();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            
finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+            finalPeer.expectFlow().withLinkCredit(10);
+            finalPeer.remoteTransfer().withHandle(0)
+                                      .withDeliveryId(0)
+                                      .withDeliveryTag(new byte[] { 1 })
+                                      .withMore(false)
+                                      .withSettled(true)
+                                      .withMessageFormat(0)
+                                      
.withPayload(payload).queue().afterDelay(5);
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.reconnectOptions().reconnectEnabled(true);
+            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+            StreamReceiverOptions rcvOpts = new 
StreamReceiverOptions().autoAccept(false);
+            StreamReceiver receiver = 
connection.openStreamReceiver("test-receiver", rcvOpts);
+
+            StreamDelivery delivery = null;
+            try {
+                delivery = receiver.receive(10, TimeUnit.SECONDS);
+            } catch (Exception ex) {
+                fail("Should not have failed on blocking receive call." + 
ex.getMessage());
+            }
+
+            assertNotNull(delivery);
+
+            firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            finalPeer.waitForScriptToComplete();
+            finalPeer.expectDetach().respond();
+            finalPeer.expectEnd().respond();
+            finalPeer.expectClose().respond();
+
+            delivery.accept();
+
+            receiver.close();
+            connection.close();
+
+            assertNotNull(delivery);
+        }
+    }
+
     private byte[] createInvalidHeaderEncoding() {
         final byte[] buffer = new byte[12];
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to