This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 9a49692 PROTON-2513 Perform all the close work asynchronously when
requested
9a49692 is described below
commit 9a49692b18334fa0913aafa56767e4489e06002b
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 4 14:50:07 2022 -0500
PROTON-2513 Perform all the close work asynchronously when requested
Ensures that Client and Connection closeAsync are truly asynchronous and
will complete the close future once the remote responds to a close
request if not already closed. Fixes some races and ensures that work
is not added to the event loop when not necessary for already closed
resources.
---
.../protonj2/client/impl/ClientConnection.java | 86 ++++++++++------------
.../qpid/protonj2/client/impl/ClientInstance.java | 37 +++++-----
.../qpid/protonj2/client/impl/ClientReceiver.java | 1 +
.../qpid/protonj2/client/impl/ClientSender.java | 29 ++++----
.../qpid/protonj2/client/impl/ClientSession.java | 2 +-
.../protonj2/client/impl/ClientStreamReceiver.java | 31 ++++----
.../protonj2/client/impl/ClientStreamSender.java | 2 +
.../protonj2/client/transport/NettyIOContext.java | 13 ++++
.../qpid/protonj2/client/impl/ClientTest.java | 42 ++++++++++-
9 files changed, 149 insertions(+), 94 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index 184f7bf..9715ecf 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -89,8 +89,8 @@ public class ClientConnection implements Connection {
// later on we may decide this is overly optimized.
private static final AtomicIntegerFieldUpdater<ClientConnection>
CLOSED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ClientConnection.class,
"closed");
- private static final AtomicReferenceFieldUpdater<ClientConnection,
ClientException> FAILURE_CAUSE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(ClientConnection.class,
ClientException.class, "failureCause");
+ private static final AtomicReferenceFieldUpdater<ClientConnection,
ClientIOException> FAILURE_CAUSE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(ClientConnection.class,
ClientIOException.class, "failureCause");
private final ClientInstance client;
private final ConnectionOptions options;
@@ -113,7 +113,7 @@ public class ClientConnection implements Connection {
private ClientFuture<Connection> openFuture;
private ClientFuture<Connection> closeFuture;
private volatile int closed;
- private volatile ClientException failureCause;
+ private volatile ClientIOException failureCause;
private long totalConnections;
private long reconnectAttempts;
private long nextReconnectDelay = -1;
@@ -197,6 +197,7 @@ public class ClientConnection implements Connection {
private Future<Connection> doClose(ErrorCondition error) {
if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
try {
+ // Already closed by failure or shutdown so no need to queue
task
if (!closeFuture.isDone()) {
executor.execute(() -> {
LOG.trace("Close requested for connection: {}", this);
@@ -215,19 +216,9 @@ public class ClientConnection implements Connection {
});
}
} catch (RejectedExecutionException rje) {
+ // If the engine already shutdown due to the remote dropping
then we
+ // can just ignore that and continue as if everything already
was closed.
LOG.trace("Close task rejected from the event loop", rje);
- } finally {
- try {
- closeFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- // Ignore error as we are closed regardless
- } finally {
- try {
- transport.close();
- } catch (Exception ignore) {}
-
- ioContext.shutdown();
- }
}
}
@@ -727,6 +718,11 @@ public class ClientConnection implements Connection {
LOG.trace("Engine reports failure with error: {}",
failureCause.getMessage());
if (isReconnectAllowed(failureCause)) {
+ // Disconnect the failed engine for this connection's event
handling
+ // to prevent cleanup processing of that engine instance from
triggering
+ // normal connection shutdown processing.
+ engine.shutdownHandler(null);
+
LOG.info("Connection {} interrupted to server: {}:{}", getId(),
transport.getHost(), transport.getPort());
submitDisconnectionEvent(options.interruptedHandler(),
transport.getHost(), transport.getPort(), failureCause);
@@ -739,12 +735,14 @@ public class ClientConnection implements Connection {
initializeProtonResources(remoteLocation);
scheduleReconnect(remoteLocation);
} catch (ClientException initError) {
-
failConnection(ClientExceptionSupport.createOrPassthroughFatal(initError));
+ FAILURE_CAUSE_UPDATER.compareAndSet(this, null,
ClientExceptionSupport.createOrPassthroughFatal(initError));
+ this.engine.shutdown(); // Close down the engine created for
reconnect
} finally {
- engine.shutdown();
+ engine.shutdown(); // Failed instance gets cleaned up.
}
} else {
- failConnection(failureCause);
+ FAILURE_CAUSE_UPDATER.compareAndSet(this, null, failureCause);
+ engine.shutdown();
}
}
@@ -754,23 +752,33 @@ public class ClientConnection implements Connection {
* with reconnect cases and avoid this event unless reconnect cannot
proceed.
*/
private void handleEngineShutdown(Engine engine) {
- // Only handle this on normal shutdown failure will perform its own
controlled shutdown
- // and or reconnection logic which this method should avoid
interfering with.
- if (engine.failureCause() == null) {
- try {
- protonConnection.close();
- } catch (Exception ignore) {
- }
+ try {
+ protonConnection.close();
+ } catch (Exception ignore) {}
- try {
- transport.close();
- } catch (Exception ignored) {}
+ try {
+ transport.close();
+ } catch (Exception ignore) {}
+
+ ioContext.shutdownAsync();
+
+ if (failureCause != null)
+ {
+ openFuture.failed(failureCause);
+ closeFuture.complete(this);
- client.unregisterConnection(this);
+ LOG.warn("Connection {} has failed due to: {}", getId(),
failureCause != null ?
+ failureCause.getClass().getSimpleName() + " -> " +
failureCause.getMessage() : "No failure details provided.");
+ submitDisconnectionEvent(options.disconnectedHandler(),
transport.getHost(), transport.getPort(), failureCause);
+ }
+ else
+ {
openFuture.complete(this);
closeFuture.complete(this);
}
+
+ client.unregisterConnection(this);
}
private void submitConnectionEvent(BiConsumer<Connection, ConnectionEvent>
handler, String host, int port, ClientIOException cause) {
@@ -805,26 +813,6 @@ public class ClientConnection implements Connection {
}
}
- private void failConnection(ClientIOException failureCause) {
- FAILURE_CAUSE_UPDATER.compareAndSet(this, null, failureCause);
-
- try {
- protonConnection.close();
- } catch (Exception ignore) {}
-
- try {
- engine.shutdown();
- } catch (Exception ignore) {}
-
- openFuture.failed(failureCause);
- closeFuture.complete(this);
-
- LOG.warn("Connection {} has failed due to: {}", getId(), failureCause
!= null ?
- failureCause.getClass().getSimpleName() + " -> " +
failureCause.getMessage() : "No failure details provided.");
-
- submitDisconnectionEvent(options.disconnectedHandler(),
transport.getHost(), transport.getPort(), failureCause);
- }
-
private Engine configureEngineSaslSupport() {
if (options.saslOptions().saslEnabled()) {
SaslMechanismSelector mechSelector =
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientInstance.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientInstance.java
index acd6ff9..e91f731 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientInstance.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientInstance.java
@@ -24,6 +24,7 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.ClientOptions;
@@ -47,6 +48,8 @@ public final class ClientInstance implements Client {
private static final IdGenerator CONTAINER_ID_GENERATOR = new
IdGenerator();
private static final ClientFutureFactory FUTURES =
ClientFutureFactory.create(ClientFutureFactory.CONSERVATIVE);
+ private static final AtomicIntegerFieldUpdater<ClientInstance>
CLOSED_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ClientInstance.class,
"closed");
private final AtomicInteger CONNECTION_COUNTER = new AtomicInteger();
private final ClientOptions options;
@@ -55,7 +58,7 @@ public final class ClientInstance implements Client {
private final String clientUniqueId = CONTAINER_ID_GENERATOR.generateId();
private final ClientFuture<Client> closedFuture = FUTURES.createFuture();
- private volatile boolean closed;
+ private volatile int closed;
/**
* @return a newly create {@link ClientInstance} that uses default
configuration.
@@ -136,22 +139,20 @@ public final class ClientInstance implements Client {
@Override
public synchronized Future<Client> closeAsync() {
- if (!closed) {
- closed = true;
-
+ if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
if (connections.isEmpty()) {
closedFuture.complete(this);
} else {
+ // Make a copy as the connection close will modify the
connections
+ // Map as each connection closes and removes itself from the
container.
List<Connection> connectionsView = new
ArrayList<>(connections.values());
- connectionsView.forEach((connection) -> connection.close());
-
- for (Connection connection : connectionsView) {
+ connectionsView.forEach((connection) -> {
try {
- connection.close();
+ connection.closeAsync();
} catch (Throwable ignored) {
LOG.trace("Error while closing connection, ignoring",
ignored);
}
- }
+ });
}
}
@@ -160,8 +161,12 @@ public final class ClientInstance implements Client {
//----- Internal API
+ boolean isClosed() {
+ return closed > 0;
+ }
+
private void checkClosed() throws ClientIllegalStateException {
- if (closed) {
+ if (isClosed()) {
throw new ClientIllegalStateException("Cannot create new
connections, the Client has been closed.");
}
}
@@ -170,17 +175,15 @@ public final class ClientInstance implements Client {
return getClientUniqueId() + ":" +
CONNECTION_COUNTER.incrementAndGet();
}
- private ClientConnection addConnection(ClientConnection connection) {
+ private synchronized ClientConnection addConnection(ClientConnection
connection) {
connections.put(connection.getId(), connection);
return connection;
}
- void unregisterConnection(ClientConnection connection) {
- synchronized (connections) {
- connections.remove(connection.getId());
- if (closed && connections.isEmpty()) {
- closedFuture.complete(this);
- }
+ synchronized void unregisterConnection(ClientConnection connection) {
+ connections.remove(connection.getId());
+ if (isClosed() && connections.isEmpty()) {
+ closedFuture.complete(this);
}
}
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index 480e6dc..2142999 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -247,6 +247,7 @@ public final class ClientReceiver implements Receiver {
private ClientFuture<Receiver> doCloseOrDetach(boolean close,
ErrorCondition error) {
if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
+ // Already closed by failure or shutdown so no need to queue task
if (!closeFuture.isDone()) {
executor.execute(() -> {
if (protonReceiver.isLocallyOpen()) {
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index cf75424..3d3a80f 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -206,21 +206,24 @@ class ClientSender implements Sender {
private ClientFuture<Sender> doCloseOrDetach(boolean close, ErrorCondition
error) {
if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
- executor.execute(() -> {
- if (protonSender.isLocallyOpen()) {
- try {
-
protonSender.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
-
- if (close) {
- protonSender.close();
- } else {
- protonSender.detach();
+ // Already closed by failure or shutdown so no need to queue task
+ if (!closeFuture.isDone()) {
+ executor.execute(() -> {
+ if (protonSender.isLocallyOpen()) {
+ try {
+
protonSender.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
+
+ if (close) {
+ protonSender.close();
+ } else {
+ protonSender.detach();
+ }
+ } catch (Throwable ignore) {
+ closeFuture.complete(this);
}
- } catch (Throwable ignore) {
- closeFuture.complete(this);
}
- }
- });
+ });
+ }
}
return closeFuture;
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
index f372eb2..1681d96 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
@@ -136,7 +136,7 @@ public class ClientSession implements Session {
private Future<Session> doClose(ErrorCondition error) {
if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
- // Already closed by failure or shutdown so no need to
+ // Already closed by failure or shutdown so no need to queue task
if (!closeFuture.isDone()) {
serializer.execute(() -> {
if (protonSession.isLocallyOpen()) {
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
index 3e51e19..b1ae55c 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
@@ -175,21 +175,24 @@ public final class ClientStreamReceiver implements
StreamReceiver {
private ClientFuture<Receiver> doCloseOrDetach(boolean close,
ErrorCondition error) {
if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
- executor.execute(() -> {
- if (protonReceiver.isLocallyOpen()) {
- try {
-
protonReceiver.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
-
- if (close) {
- protonReceiver.close();
- } else {
- protonReceiver.detach();
+ // Already closed by failure or shutdown so no need to queue task
+ if (!closeFuture.isDone()) {
+ executor.execute(() -> {
+ if (protonReceiver.isLocallyOpen()) {
+ try {
+
protonReceiver.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
+
+ if (close) {
+ protonReceiver.close();
+ } else {
+ protonReceiver.detach();
+ }
+ } catch (Throwable ignore) {
+ closeFuture.complete(this);
}
- } catch (Throwable ignore) {
- closeFuture.complete(this);
}
- }
- });
+ });
+ }
}
return closeFuture;
@@ -652,6 +655,8 @@ public final class ClientStreamReceiver implements
StreamReceiver {
}
private void immediateLinkShutdown(ClientException failureCause) {
+ CLOSED_UPDATER.set(this, 1);
+
if (this.failureCause == null) {
this.failureCause = failureCause;
}
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index abc2127..0edebac 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -117,6 +117,8 @@ public final class ClientStreamSender extends ClientSender
implements StreamSend
}
StreamTracker sendMessage(ClientStreamSenderMessage context, ProtonBuffer
payload, int messageFormat) throws ClientException {
+ checkClosedOrFailed();
+
final ClientFuture<Tracker> operation =
session.getFutureFactory().createFuture();
final ProtonBuffer buffer = payload;
final ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/NettyIOContext.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/NettyIOContext.java
index 072ba82..18de96e 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/NettyIOContext.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/NettyIOContext.java
@@ -42,6 +42,8 @@ public final class NettyIOContext {
private static final Logger LOG =
LoggerFactory.getLogger(NettyIOContext.class);
private static final int SHUTDOWN_TIMEOUT = 50;
+ private static final int ASYNC_SHUTDOWN_TIMEOUT = 100;
+ private static final int ASYNC_SHUTDOWN_QUIET_PERIOD = 10;
private final EventLoopGroup group;
private final Class<? extends Channel> channelClass;
@@ -111,6 +113,17 @@ public final class NettyIOContext {
}
}
+ /**
+ * Shutdown the event loop asynchronously with a grace period for work
that might be in-bound
+ * at the time of termination. This is safe to call from inside the event
loop where the
+ * standard blocking shutdown API is not.
+ */
+ public void shutdownAsync() {
+ if (!group.isShutdown()) {
+ group.shutdownGracefully(ASYNC_SHUTDOWN_QUIET_PERIOD,
ASYNC_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+
public EventLoopGroup eventLoop() {
return group;
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientTest.java
index d1627cf..a14472d 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ClientTest.java
@@ -23,7 +23,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.net.URI;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.ClientOptions;
@@ -35,6 +37,8 @@ import
org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test the Client API implementation
@@ -42,6 +46,8 @@ import org.junit.jupiter.api.Timeout;
@Timeout(20)
public class ClientTest extends ImperativeClientTestCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(ClientTest.class);
+
@Test
public void testCreateWithNoContainerIdIsAllowed() {
ClientOptions options = new ClientOptions();
@@ -109,6 +115,40 @@ public class ClientTest extends ImperativeClientTestCase {
}
@Test
+ public void testCloseAsyncDoesNotBlockWaitingOnConnectionClose() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectClose();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Connect test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort());
+
+ connection.openFuture().get();
+
+ Future<Client> closedFuture = container.closeAsync();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ try {
+ closedFuture.get(50, TimeUnit.MILLISECONDS);
+ fail("should have timed out waiting on close.");
+ } catch (TimeoutException ex) {
+ }
+
+ peer.remoteClose().now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ assertNotNull(closedFuture.get(1, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
public void testCloseAllConnectionAndWait() throws Exception {
try (ProtonTestServer firstPeer = new ProtonTestServer();
ProtonTestServer secondPeer = new ProtonTestServer()) {
@@ -135,7 +175,7 @@ public class ClientTest extends ImperativeClientTestCase {
secondPeer.waitForScriptToComplete();
firstPeer.expectClose().respond().afterDelay(10);
- secondPeer.expectClose().respond().afterDelay(11);
+ secondPeer.expectClose().respond().afterDelay(15);
container.closeAsync().get(5, TimeUnit.SECONDS);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]