This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a49692  PROTON-2513 Perform all the close work asynchronously when 
requested
9a49692 is described below

commit 9a49692b18334fa0913aafa56767e4489e06002b
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 4 14:50:07 2022 -0500

    PROTON-2513 Perform all the close work asynchronously when requested
    
    Ensures that Client and Connection closeAsync are truly asynchronous and
    will complete the close future once the remote responds to a close
    request if not already closed.  Fixes some races and ensures that work
    is not added to the event loop when not necessary for already closed
    resources.
---
 .../protonj2/client/impl/ClientConnection.java     | 86 ++++++++++------------
 .../qpid/protonj2/client/impl/ClientInstance.java  | 37 +++++-----
 .../qpid/protonj2/client/impl/ClientReceiver.java  |  1 +
 .../qpid/protonj2/client/impl/ClientSender.java    | 29 ++++----
 .../qpid/protonj2/client/impl/ClientSession.java   |  2 +-
 .../protonj2/client/impl/ClientStreamReceiver.java | 31 ++++----
 .../protonj2/client/impl/ClientStreamSender.java   |  2 +
 .../protonj2/client/transport/NettyIOContext.java  | 13 ++++
 .../qpid/protonj2/client/impl/ClientTest.java      | 42 ++++++++++-
 9 files changed, 149 insertions(+), 94 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index 184f7bf..9715ecf 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -89,8 +89,8 @@ public class ClientConnection implements Connection {
     // later on we may decide this is overly optimized.
     private static final AtomicIntegerFieldUpdater<ClientConnection> 
CLOSED_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(ClientConnection.class, 
"closed");
-    private static final AtomicReferenceFieldUpdater<ClientConnection, 
ClientException> FAILURE_CAUSE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(ClientConnection.class, 
ClientException.class, "failureCause");
+    private static final AtomicReferenceFieldUpdater<ClientConnection, 
ClientIOException> FAILURE_CAUSE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(ClientConnection.class, 
ClientIOException.class, "failureCause");
 
     private final ClientInstance client;
     private final ConnectionOptions options;
@@ -113,7 +113,7 @@ public class ClientConnection implements Connection {
     private ClientFuture<Connection> openFuture;
     private ClientFuture<Connection> closeFuture;
     private volatile int closed;
-    private volatile ClientException failureCause;
+    private volatile ClientIOException failureCause;
     private long totalConnections;
     private long reconnectAttempts;
     private long nextReconnectDelay = -1;
@@ -197,6 +197,7 @@ public class ClientConnection implements Connection {
     private Future<Connection> doClose(ErrorCondition error) {
         if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
             try {
+                // Already closed by failure or shutdown so no need to queue 
task
                 if (!closeFuture.isDone()) {
                     executor.execute(() -> {
                         LOG.trace("Close requested for connection: {}", this);
@@ -215,19 +216,9 @@ public class ClientConnection implements Connection {
                     });
                 }
             } catch (RejectedExecutionException rje) {
+                // If the engine already shutdown due to the remote dropping 
then we
+                // can just ignore that and continue as if everything already 
was closed.
                 LOG.trace("Close task rejected from the event loop", rje);
-            } finally {
-                try {
-                    closeFuture.get();
-                } catch (InterruptedException | ExecutionException e) {
-                    // Ignore error as we are closed regardless
-                } finally {
-                    try {
-                        transport.close();
-                    } catch (Exception ignore) {}
-
-                    ioContext.shutdown();
-                }
             }
         }
 
@@ -727,6 +718,11 @@ public class ClientConnection implements Connection {
         LOG.trace("Engine reports failure with error: {}", 
failureCause.getMessage());
 
         if (isReconnectAllowed(failureCause)) {
+            // Disconnect the failed engine for this connection's event 
handling
+            // to prevent cleanup processing of that engine instance from 
triggering
+            // normal connection shutdown processing.
+            engine.shutdownHandler(null);
+
             LOG.info("Connection {} interrupted to server: {}:{}", getId(), 
transport.getHost(), transport.getPort());
             submitDisconnectionEvent(options.interruptedHandler(), 
transport.getHost(), transport.getPort(), failureCause);
 
@@ -739,12 +735,14 @@ public class ClientConnection implements Connection {
                 initializeProtonResources(remoteLocation);
                 scheduleReconnect(remoteLocation);
             } catch (ClientException initError) {
-                
failConnection(ClientExceptionSupport.createOrPassthroughFatal(initError));
+                FAILURE_CAUSE_UPDATER.compareAndSet(this, null, 
ClientExceptionSupport.createOrPassthroughFatal(initError));
+                this.engine.shutdown();  // Close down the engine created for 
reconnect
             } finally {
-                engine.shutdown();
+                engine.shutdown(); // Failed instance gets cleaned up.
             }
         } else {
-            failConnection(failureCause);
+            FAILURE_CAUSE_UPDATER.compareAndSet(this, null, failureCause);
+            engine.shutdown();
         }
     }
 
@@ -754,23 +752,33 @@ public class ClientConnection implements Connection {
      * with reconnect cases and avoid this event unless reconnect cannot 
proceed.
      */
     private void handleEngineShutdown(Engine engine) {
-        // Only handle this on normal shutdown failure will perform its own 
controlled shutdown
-        // and or reconnection logic which this method should avoid 
interfering with.
-        if (engine.failureCause() == null) {
-            try {
-                protonConnection.close();
-            } catch (Exception ignore) {
-            }
+        try {
+            protonConnection.close();
+        } catch (Exception ignore) {}
 
-            try {
-                transport.close();
-            } catch (Exception ignored) {}
+        try {
+            transport.close();
+        } catch (Exception ignore) {}
+
+        ioContext.shutdownAsync();
+
+        if (failureCause != null)
+        {
+            openFuture.failed(failureCause);
+            closeFuture.complete(this);
 
-            client.unregisterConnection(this);
+            LOG.warn("Connection {} has failed due to: {}", getId(), 
failureCause != null ?
+                    failureCause.getClass().getSimpleName() + " -> " + 
failureCause.getMessage() : "No failure details provided.");
 
+           submitDisconnectionEvent(options.disconnectedHandler(), 
transport.getHost(), transport.getPort(), failureCause);
+        }
+        else
+        {
             openFuture.complete(this);
             closeFuture.complete(this);
         }
+
+        client.unregisterConnection(this);
     }
 
     private void submitConnectionEvent(BiConsumer<Connection, ConnectionEvent> 
handler, String host, int port, ClientIOException cause) {
@@ -805,26 +813,6 @@ public class ClientConnection implements Connection {
         }
     }
 
-    private void failConnection(ClientIOException failureCause) {
-        FAILURE_CAUSE_UPDATER.compareAndSet(this, null, failureCause);
-
-        try {
-            protonConnection.close();
-        } catch (Exception ignore) {}
-
-        try {
-            engine.shutdown();
-        } catch (Exception ignore) {}
-
-        openFuture.failed(failureCause);
-        closeFuture.complete(this);
-
-        LOG.warn("Connection {} has failed due to: {}", getId(), failureCause 
!= null ?
-                 failureCause.getClass().getSimpleName() + " -> " + 
failureCause.getMessage() : "No failure details provided.");
-
-        submitDisconnectionEvent(options.disconnectedHandler(), 
transport.getHost(), transport.getPort(), failureCause);
-    }
-
     private Engine configureEngineSaslSupport() {
         if (options.saslOptions().saslEnabled()) {
             SaslMechanismSelector mechSelector =
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientInstance.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientInstance.java
index acd6ff9..e91f731 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientInstance.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientInstance.java
@@ -24,6 +24,7 @@ import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.qpid.protonj2.client.Client;
 import org.apache.qpid.protonj2.client.ClientOptions;
@@ -47,6 +48,8 @@ public final class ClientInstance implements Client {
 
     private static final IdGenerator CONTAINER_ID_GENERATOR = new 
IdGenerator();
     private static final ClientFutureFactory FUTURES = 
ClientFutureFactory.create(ClientFutureFactory.CONSERVATIVE);
+    private static final AtomicIntegerFieldUpdater<ClientInstance> 
CLOSED_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ClientInstance.class, 
"closed");
 
     private final AtomicInteger CONNECTION_COUNTER = new AtomicInteger();
     private final ClientOptions options;
@@ -55,7 +58,7 @@ public final class ClientInstance implements Client {
     private final String clientUniqueId = CONTAINER_ID_GENERATOR.generateId();
     private final ClientFuture<Client> closedFuture = FUTURES.createFuture();
 
-    private volatile boolean closed;
+    private volatile int closed;
 
     /**
      * @return a newly create {@link ClientInstance} that uses default 
configuration.
@@ -136,22 +139,20 @@ public final class ClientInstance implements Client {
 
     @Override
     public synchronized Future<Client> closeAsync() {
-        if (!closed) {
-            closed = true;
-
+        if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
             if (connections.isEmpty()) {
                 closedFuture.complete(this);
             } else {
+                // Make a copy as the connection close will modify the 
connections
+                // Map as each connection closes and removes itself from the 
container.
                 List<Connection> connectionsView = new 
ArrayList<>(connections.values());
-                connectionsView.forEach((connection) -> connection.close());
-
-                for (Connection connection : connectionsView) {
+                connectionsView.forEach((connection) -> {
                     try {
-                        connection.close();
+                        connection.closeAsync();
                     } catch (Throwable ignored) {
                         LOG.trace("Error while closing connection, ignoring", 
ignored);
                     }
-                }
+                });
             }
         }
 
@@ -160,8 +161,12 @@ public final class ClientInstance implements Client {
 
     //----- Internal API
 
+    boolean isClosed() {
+        return closed > 0;
+    }
+
     private void checkClosed() throws ClientIllegalStateException {
-        if (closed) {
+        if (isClosed()) {
             throw new ClientIllegalStateException("Cannot create new 
connections, the Client has been closed.");
         }
     }
@@ -170,17 +175,15 @@ public final class ClientInstance implements Client {
         return getClientUniqueId() + ":" + 
CONNECTION_COUNTER.incrementAndGet();
     }
 
-    private ClientConnection addConnection(ClientConnection connection) {
+    private synchronized ClientConnection addConnection(ClientConnection 
connection) {
         connections.put(connection.getId(), connection);
         return connection;
     }
 
-    void unregisterConnection(ClientConnection connection) {
-        synchronized (connections) {
-            connections.remove(connection.getId());
-            if (closed && connections.isEmpty()) {
-                closedFuture.complete(this);
-            }
+    synchronized void unregisterConnection(ClientConnection connection) {
+        connections.remove(connection.getId());
+        if (isClosed() && connections.isEmpty()) {
+            closedFuture.complete(this);
         }
     }
 }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index 480e6dc..2142999 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -247,6 +247,7 @@ public final class ClientReceiver implements Receiver {
 
     private ClientFuture<Receiver> doCloseOrDetach(boolean close, 
ErrorCondition error) {
         if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
+            // Already closed by failure or shutdown so no need to queue task
             if (!closeFuture.isDone()) {
                 executor.execute(() -> {
                     if (protonReceiver.isLocallyOpen()) {
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index cf75424..3d3a80f 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -206,21 +206,24 @@ class ClientSender implements Sender {
 
     private ClientFuture<Sender> doCloseOrDetach(boolean close, ErrorCondition 
error) {
         if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
-            executor.execute(() -> {
-                if (protonSender.isLocallyOpen()) {
-                    try {
-                        
protonSender.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
-
-                        if (close) {
-                            protonSender.close();
-                        } else {
-                            protonSender.detach();
+            // Already closed by failure or shutdown so no need to queue task
+            if (!closeFuture.isDone()) {
+                executor.execute(() -> {
+                    if (protonSender.isLocallyOpen()) {
+                        try {
+                            
protonSender.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
+
+                            if (close) {
+                                protonSender.close();
+                            } else {
+                                protonSender.detach();
+                            }
+                        } catch (Throwable ignore) {
+                            closeFuture.complete(this);
                         }
-                    } catch (Throwable ignore) {
-                        closeFuture.complete(this);
                     }
-                }
-            });
+                });
+            }
         }
         return closeFuture;
     }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
index f372eb2..1681d96 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
@@ -136,7 +136,7 @@ public class ClientSession implements Session {
 
     private Future<Session> doClose(ErrorCondition error) {
         if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
-            // Already closed by failure or shutdown so no need to
+            // Already closed by failure or shutdown so no need to queue task
             if (!closeFuture.isDone()) {
                 serializer.execute(() -> {
                     if (protonSession.isLocallyOpen()) {
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
index 3e51e19..b1ae55c 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
@@ -175,21 +175,24 @@ public final class ClientStreamReceiver implements 
StreamReceiver {
 
     private ClientFuture<Receiver> doCloseOrDetach(boolean close, 
ErrorCondition error) {
         if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
-            executor.execute(() -> {
-                if (protonReceiver.isLocallyOpen()) {
-                    try {
-                        
protonReceiver.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
-
-                        if (close) {
-                            protonReceiver.close();
-                        } else {
-                            protonReceiver.detach();
+            // Already closed by failure or shutdown so no need to queue task
+            if (!closeFuture.isDone()) {
+                executor.execute(() -> {
+                    if (protonReceiver.isLocallyOpen()) {
+                        try {
+                            
protonReceiver.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
+
+                            if (close) {
+                                protonReceiver.close();
+                            } else {
+                                protonReceiver.detach();
+                            }
+                        } catch (Throwable ignore) {
+                            closeFuture.complete(this);
                         }
-                    } catch (Throwable ignore) {
-                        closeFuture.complete(this);
                     }
-                }
-            });
+                });
+            }
         }
 
         return closeFuture;
@@ -652,6 +655,8 @@ public final class ClientStreamReceiver implements 
StreamReceiver {
     }
 
     private void immediateLinkShutdown(ClientException failureCause) {
+        CLOSED_UPDATER.set(this, 1);
+
         if (this.failureCause == null) {
             this.failureCause = failureCause;
         }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index abc2127..0edebac 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -117,6 +117,8 @@ public final class ClientStreamSender extends ClientSender 
implements StreamSend
     }
 
     StreamTracker sendMessage(ClientStreamSenderMessage context, ProtonBuffer 
payload, int messageFormat) throws ClientException {
+        checkClosedOrFailed();
+
         final ClientFuture<Tracker> operation = 
session.getFutureFactory().createFuture();
         final ProtonBuffer buffer = payload;
         final ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/NettyIOContext.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/NettyIOContext.java
index 072ba82..18de96e 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/NettyIOContext.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/NettyIOContext.java
@@ -42,6 +42,8 @@ public final class NettyIOContext {
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyIOContext.class);
 
     private static final int SHUTDOWN_TIMEOUT = 50;
+    private static final int ASYNC_SHUTDOWN_TIMEOUT = 100;
+    private static final int ASYNC_SHUTDOWN_QUIET_PERIOD = 10;
 
     private final EventLoopGroup group;
     private final Class<? extends Channel> channelClass;
@@ -111,6 +113,17 @@ public final class NettyIOContext {
         }
     }
 
+    /**
+     * Shutdown the event loop asynchronously with a grace period for work 
that might be in-bound
+     * at the time of termination.  This is safe to call from inside the event 
loop where the
+     * standard blocking shutdown API is not.
+     */
+    public void shutdownAsync() {
+        if (!group.isShutdown()) {
+            group.shutdownGracefully(ASYNC_SHUTDOWN_QUIET_PERIOD, 
ASYNC_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+        }
+    }
+
     public EventLoopGroup eventLoop() {
         return group;
     }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientTest.java
index d1627cf..a14472d 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientTest.java
@@ -23,7 +23,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.net.URI;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.qpid.protonj2.client.Client;
 import org.apache.qpid.protonj2.client.ClientOptions;
@@ -35,6 +37,8 @@ import 
org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test the Client API implementation
@@ -42,6 +46,8 @@ import org.junit.jupiter.api.Timeout;
 @Timeout(20)
 public class ClientTest extends ImperativeClientTestCase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
+
     @Test
     public void testCreateWithNoContainerIdIsAllowed() {
         ClientOptions options = new ClientOptions();
@@ -109,6 +115,40 @@ public class ClientTest extends ImperativeClientTestCase {
     }
 
     @Test
+    public void testCloseAsyncDoesNotBlockWaitingOnConnectionClose() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectClose();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Connect test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+
+            connection.openFuture().get();
+
+            Future<Client> closedFuture = container.closeAsync();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            try {
+                closedFuture.get(50, TimeUnit.MILLISECONDS);
+                fail("should have timed out waiting on close.");
+            } catch (TimeoutException ex) {
+            }
+
+            peer.remoteClose().now();
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            assertNotNull(closedFuture.get(1, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
     public void testCloseAllConnectionAndWait() throws Exception {
         try (ProtonTestServer firstPeer = new ProtonTestServer();
              ProtonTestServer secondPeer = new ProtonTestServer()) {
@@ -135,7 +175,7 @@ public class ClientTest extends ImperativeClientTestCase {
             secondPeer.waitForScriptToComplete();
 
             firstPeer.expectClose().respond().afterDelay(10);
-            secondPeer.expectClose().respond().afterDelay(11);
+            secondPeer.expectClose().respond().afterDelay(15);
 
             container.closeAsync().get(5, TimeUnit.SECONDS);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to