This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch TINKERPOP-3181 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 79b04934e994b31775d6e0318f2352ac9c98d952 Author: Stephen Mallette <[email protected]> AuthorDate: Fri Aug 15 14:43:24 2025 -0400 TINKERPOP-3181 Improved host availability on initialization. If the connection pool creates at least one good connection on initialization, allow the Client to have an available Host even if it's less than the minPoolSize. Allow normal recovery options to attempt reconnects and fall into unavailability the standard ways. --- CHANGELOG.asciidoc | 2 + .../apache/tinkerpop/gremlin/driver/Client.java | 5 +- .../tinkerpop/gremlin/driver/ConnectionPool.java | 31 ++++--- .../driver/ClientConnectionIntegrateTest.java | 96 ++++++++++++++++++++++ 4 files changed, 122 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 56bcfc9c52..cb490db909 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -23,6 +23,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima [[release-3-7-5]] === TinkerPop 3.7.5 (Release Date: NOT OFFICIALLY RELEASED YET) +* Improved Java driver host availability on connection pool initialization. + [[release-3-7-4]] === TinkerPop 3.7.4 (Release Date: August 1, 2025) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index b8dcb4c636..fc706b8536 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.tinkerpop.gremlin.driver.RequestOptions.getRequestOptions; @@ -433,6 +434,7 @@ public abstract class Client { */ public final static class ClusteredClient extends Client { + Supplier<ConnectionFactory> connectionFactorySupplier = ConnectionFactory.DefaultConnectionFactory::new; final ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>(); private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null); private Throwable initializationFailure = null; @@ -597,7 +599,8 @@ public abstract class Client { private Consumer<Host> initializeConnectionSetupForHost = host -> { try { // hosts that don't initialize connection pools will come up as a dead host. - hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this)); + hostConnectionPools.put(host, new ConnectionPool(host, ClusteredClient.this, + Optional.empty(), Optional.empty(), connectionFactorySupplier.get())); // hosts are not marked as available at cluster initialization and are made available here instead. host.makeAvailable(); diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index ed776843a1..97128fbaf7 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -148,21 +148,30 @@ final class ConnectionPool { logger.warn("Initialization of connections cancelled for {}", this.getPoolInfo(), ce); throw ce; } catch (CompletionException ce) { - // Some connections might have been initialized. Close the connection pool gracefully to close them. - this.closeAsync(); + // Some connections might have been initialized, let's respect those as of 3.7.5 + if (connections.isEmpty()) { + // Close the connection pool since we have zero connections + this.closeAsync(); - final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool." + - " Successful connections=" + this.connections.size() + - ". Closing the connection pool."; + final String errMsg = "Could not initialize " + minPoolSize + " (minPoolSize) connections in pool for " + + this.host + ". " + + " Successful connections=" + this.connections.size() + + ". Closing the connection pool."; - Throwable cause; - Throwable result = ce; + Throwable cause; + Throwable result = ce; - if (null != (cause = result.getCause())) { - result = cause; - } + if (null != (cause = result.getCause())) { + result = cause; + } - throw new CompletionException(errMsg, result); + throw new CompletionException(errMsg, result); + } else { + // warn that the error may have the driver below the min pool size. expect recovery, but no point + // going to NoHostAvailableException for potentially a single connection error. + logger.warn("ConnectionPool for " + this.host + " initialized with " + connections.size() + + " expected minPoolSize was " + minPoolSize + " - will attempt to recover", ce); + } } logger.info("Opening connection pool on {} with core size of {}", host, minPoolSize); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java index 861b62465b..27d87b47d3 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java @@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.is; @@ -300,6 +301,70 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat cluster.close(); } + /** + * Added for TINKERPOP-XXX - this scenario would have previously closed the connection pool and left us with + * {@link NoHostAvailableException} just because a single connection failed on {@link Client} initialization while + * others succeeded. + */ + @Test + public void shouldSucceedWithSingleConnectionFailureOnInit() throws Exception { + // set the min connection pool size to 4 so that they all get created on init as that's the area we want to + // test host availability behavior + final Cluster cluster = TestClientFactory.build().minConnectionPoolSize(4).maxConnectionPoolSize(4). + reconnectInterval(1000). + maxWaitForConnection(4000).validationRequest("g.inject()").create(); + final Client.ClusteredClient client = cluster.connect(); + + // we let 3 connections succeed but then fail on the 4th + final SingleFailConnectionFactory connectionFactory = new SingleFailConnectionFactory(3); + client.connectionFactorySupplier = () -> connectionFactory; + + // prior to 3.7.5, we would have seen this pop an exception and close the pool with error like: + // Could not initialize 4 (minPoolSize) connections in pool. Successful connections=3. Closing the connection pool. + // which doesn't really make sense because 3 prior connection were good. perhaps this fourth one just had a + // very temporary network issue. the other 3 could technically still be serviceable. the 4th shouldn't end + // connectivity and assume the host is dead as ultimately closing the pool at this point in init will end in + // NoHostAvailableException. + // + // Starting at 3.7.5, we can allow a bit more failure here before killing the pool for just a single connection + // failure. we might be below min pool size at init but we log that warning and expect fast recovery from the + // driver in the best case and in the worst case, normal processes of reconnect kick in and stabilize. + client.init(); + + assertEquals(3, connectionFactory.getConnectionsCreated()); + + // load up a hella ton of requests + final int requests = 1000; + final CountDownLatch latch = new CountDownLatch(requests); + final AtomicBoolean hadFail = new AtomicBoolean(false); + + new Thread(() -> { + IntStream.range(0, requests).forEach(i -> { + try { + client.submitAsync("1 + " + i); + } catch (Exception ex) { + // we could catch a TimeoutException here in some cases if the jitters cause a borrow of a + // connection to take too long. submitAsync() will wrap in a RuntimeException. can't assert + // this condition inside this thread or the test locks up + hadFail.compareAndSet(false, true); + } finally { + latch.countDown(); + } + }); + }, "worker-shouldSucceedWithSingleConnectionFailureOnInit").start(); + + // wait for requests to complete + assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)); + + // we can send some requests because we have 3 created connections + assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); + + // not expecting any failures + assertThat(hadFail.get(), is(false)); + + cluster.close(); + } + /** * Introduces random failures when creating a {@link Connection} for the {@link ConnectionPool}. */ @@ -331,4 +396,35 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat return ConnectionFactory.super.create(pool); } } + + /** + * Introduces a failure after the specified number of {@link Connection} instance are created for the + * {@link ConnectionPool}. + */ + public static class SingleFailConnectionFactory implements ConnectionFactory { + + private int connectionsCreated = 0; + private int failAfter; + private boolean failedOnce = false; + + public SingleFailConnectionFactory(final int failAfter) { + this.failAfter = failAfter; + } + + public int getConnectionsCreated() { + return connectionsCreated; + } + + @Override + public Connection create(final ConnectionPool pool) { + if (!failedOnce && connectionsCreated == failAfter) { + failedOnce = true; + throw new ConnectionException(pool.host.getHostUri(), + new SSLHandshakeException("We big mad on purpose")); + } + connectionsCreated++; + + return ConnectionFactory.super.create(pool); + } + } }
