This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch TINKERPOP-2813 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 4401f0163a0af290d95b7336da98cb3191355c9b Author: Stephen Mallette <[email protected]> AuthorDate: Mon Nov 28 13:30:08 2022 -0500 TINKERPOP-2813 Fixed some issues with Cluster.close() After refactoring to add the new thread pools, some situations seemed to arise where connections didn't close cleanly. Not clear if this came as a result of the thread pool changes or if they simply were always present but hidden behind use of fork/join pool and the normal gremlin pool. Still don't quite have a good async shutdown going but that wasn't quite working that way prior to these changes so perhaps that is better saved for a dedicated body of work. --- .../apache/tinkerpop/gremlin/driver/Cluster.java | 38 +++++++++++++++------- .../tinkerpop/gremlin/driver/Connection.java | 28 +++++++++------- .../tinkerpop/gremlin/driver/ConnectionPool.java | 19 ++++++++--- .../driver/ClientConnectionIntegrateTest.java | 4 ++- 4 files changed, 61 insertions(+), 28 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index e93074d3fc..7392deb7d5 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -24,6 +24,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.util.concurrent.Future; import org.apache.commons.configuration2.Configuration; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; @@ -60,6 +61,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -1021,10 +1023,14 @@ public final class Cluster { return b; } - void shutdown() { + /** + * Gracefully shutsdown the event loop and returns the termination future which signals that all jobs are done. + */ + Future<?> shutdown() { // Do not provide a quiet period (default is 2s) to accept more requests. Once we have decided to shutdown, // no new requests should be accepted. - group.shutdownGracefully(/*quiet period*/0, /*timeout*/2, TimeUnit.SECONDS).awaitUninterruptibly(); + group.shutdownGracefully(/*quiet period*/0, /*timeout*/2, TimeUnit.SECONDS); + return group.terminationFuture(); } } @@ -1219,25 +1225,33 @@ public final class Cluster { if (closeFuture.get() != null) return closeFuture.get(); + final List<CompletableFuture<Void>> clientCloseFutures = new ArrayList<>(openedClients.size()); for (WeakReference<Client> openedClient : openedClients) { final Client client = openedClient.get(); - if (client != null && !client.isClosing()) { - client.close(); + if (client != null) { + // best to call close() even if the Client is already closing so that we can be sure that + // any background client closing operations are included in this shutdown future + clientCloseFutures.add(client.closeAsync()); } } - final CompletableFuture<Void> closeIt = new CompletableFuture<>(); - closeFuture.set(closeIt); + // when all the clients are fully closed then shutdown the netty event loop. not sure why this needs to + // block here, but if it doesn't then factory.shutdown() below doesn't seem to want to ever complete. + // ideally, this should all be async, but i guess it wasn't before this change so just going to leave it + // for now as this really isn't the focus on this change + CompletableFuture.allOf(clientCloseFutures.toArray(new CompletableFuture[0])).join(); - hostScheduler.submit(() -> { - factory.shutdown(); + final CompletableFuture<Void> closeIt = new CompletableFuture<>(); + // shutdown the event loop. that shutdown can trigger some final jobs to get scheduled so add a listener + // to the termination event to shutdown remaining thread pools + factory.shutdown().awaitUninterruptibly().addListener(f -> { + executor.shutdown(); + hostScheduler.shutdown(); + connectionScheduler.shutdown(); closeIt.complete(null); }); - // Prevent the executor from accepting new tasks while still allowing enqueued tasks to complete - executor.shutdown(); - connectionScheduler.shutdown(); - hostScheduler.shutdown(); + closeFuture.set(closeIt); return closeIt; } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 0dfe0dbab3..1183a3bdc4 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -29,6 +29,7 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.URI; +import java.time.Instant; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -56,6 +57,8 @@ final class Connection { private final Cluster cluster; private final Client client; private final ConnectionPool pool; + private final String creatingThread; + private final String createdTimestamp; public static final int MAX_IN_PROCESS = 4; public static final int MIN_IN_PROCESS = 1; @@ -95,7 +98,8 @@ final class Connection { this.client = pool.getClient(); this.pool = pool; this.maxInProcess = maxInProcess; - + this.creatingThread = Thread.currentThread().getName(); + this.createdTimestamp = Instant.now().toString(); connectionLabel = "Connection{host=" + pool.host + "}"; if (cluster.isClosing()) @@ -302,8 +306,6 @@ final class Connection { // guess). that seems to put the executor thread in a monitor state that it doesn't recover from. since all // the code in here is behind shutdownInitiated the synchronized doesn't seem necessary if (shutdownInitiated.compareAndSet(false, true)) { - final String connectionInfo = this.getConnectionInfo(); - // the session close message was removed in 3.5.0 after deprecation at 3.3.11. That removal was perhaps // a bit hasty as session semantics may still require this message in certain cases. Until we can look // at this in more detail, it seems best to bring back the old functionality to the driver. @@ -342,7 +344,8 @@ final class Connection { channelizer.close(channel); // seems possible that the channelizer could initialize but fail to produce a channel, so worth checking - // null before proceeding here + // null before proceeding here. also if the cluster is in shutdown then the event loop could be shutdown + // already and there will be no way to get a new promise out there. if (channel != null) { final ChannelPromise promise = channel.newPromise(); promise.addListener(f -> { @@ -350,7 +353,7 @@ final class Connection { future.completeExceptionally(f.cause()); } else { if (logger.isDebugEnabled()) - logger.debug("{} destroyed successfully.", connectionInfo); + logger.debug("{} destroyed successfully.", this.getConnectionInfo()); future.complete(null); } @@ -365,9 +368,12 @@ final class Connection { } } } else { - logger.debug("Connection {} is shutting down but the channel was not initialized to begin with", - getConnectionInfo()); + // if we dont handle the supplied future it can hang the close + future.complete(null); } + } else { + // if we dont handle the supplied future it can hang the close + future.complete(null); } } @@ -385,10 +391,10 @@ final class Connection { */ public String getConnectionInfo(final boolean showHost) { return showHost ? - String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}", - getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing()) : - String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}", - getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing()); + String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}", + getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread) : + String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}", + getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread); } /** diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index 4e5f019273..0ec3a4c3af 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -272,14 +272,25 @@ final class ConnectionPool { return bin.size(); } + /** + * Calls close on connections in the pool gathering close futures from both active connections and ones in the + * bin. + */ private CompletableFuture<Void> killAvailableConnections() { - final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size()); + final List<CompletableFuture<Void>> futures = new ArrayList<>(connections.size() + bin.size()); for (Connection connection : connections) { final CompletableFuture<Void> future = connection.closeAsync(); future.thenRun(open::decrementAndGet); futures.add(future); } + // Without the ones in the bin the close for the ConnectionPool won't account for their shutdown and could + // lead to scenario where the bin connections stay open after the channel executor is closed which then + // leads to close operation getting rejected in Connection.close() for channel.newPromise(). + for (Connection connection : bin) { + futures.add(connection.closeAsync()); + } + return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } @@ -368,7 +379,7 @@ final class ConnectionPool { private boolean destroyConnection(final Connection connection) { while (true) { - int opened = open.get(); + final int opened = open.get(); if (opened <= minPoolSize) return false; @@ -598,8 +609,8 @@ final class ConnectionPool { } else { final int connectionCount = connections.size(); sb.append(System.lineSeparator()); - sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s markedOpen=%s bin=%s)", - connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), this.open.get(), bin.size())); + sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s bin=%s)", + connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), bin.size())); sb.append(System.lineSeparator()); appendConnections(sb, connectionToCallout, connections); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java index 1417409c47..8899db2e9e 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java @@ -184,6 +184,8 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat // if there was a exception in the worker thread, then it had better be a TimeoutException assertThat(hadFailOtherThanTimeout.get(), is(false)); + connectionFactory.jittery = false; + cluster.close(); } @@ -212,7 +214,7 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat if (jittery && connectionsCreated.incrementAndGet() % numberOfConnectionsBetweenErrors == 0) { connectionFailures.incrementAndGet(); throw new ConnectionException(pool.host.getHostUri(), - new SSLHandshakeException("SSL on the funk - server is big mad")); + new SSLHandshakeException("SSL on the funk - server is big mad with the jitters")); } return ConnectionFactory.super.create(pool);
