This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch TINKERPOP-2813 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 942c6b8f61c8d5669e070395d18478a152044794 Author: Stephen Mallette <[email protected]> AuthorDate: Fri Nov 4 15:16:19 2022 -0400 TINKERPOP-2813 Removed fast NoHostAvailableException Took a more optimistic approach to determining Host availability which prevents what may be intermittent network/server problems from being interpreted as the Host not being reachable. Takes a faster approach to reconnecting when a host is found to "maybe" be unavailable. --- CHANGELOG.asciidoc | 3 + .../apache/tinkerpop/gremlin/driver/Client.java | 27 +++-- .../tinkerpop/gremlin/driver/Connection.java | 66 ++++++----- .../gremlin/driver/ConnectionFactory.java | 41 +++++++ .../tinkerpop/gremlin/driver/ConnectionPool.java | 123 +++++++++++++++------ .../org/apache/tinkerpop/gremlin/driver/Host.java | 18 +++ .../driver/exception/NoHostAvailableException.java | 8 +- .../driver/ClientConnectionIntegrateTest.java | 108 ++++++++++++++++++ .../AbstractGremlinServerIntegrationTest.java | 8 +- .../gremlin/server/GremlinDriverIntegrateTest.java | 2 - 10 files changed, 328 insertions(+), 76 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a95744a692..c026be3a76 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -45,6 +45,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Modified `Connection` and `Host` job scheduling in `gremlin-driver` by dividing their work to two different thread pools and sparing work from the primary pool responsible for submitting requests and reading results. * Prevented usage of the fork-join pool for `gremlin-driver` job scheduling. * Changed `Host` initialization within a `Client` to be parallel again in `gremlin-driver`. +* Changed mechanism for determining `Host` health which should make the driver more resilient to intermittent network failures. +* Removed the delay for reconnecting to a potentially unhealthy `Host` only marking it as unavailable after that initial retry fails. +* Prevented fast `NoHostAvailableException` in favor of more direct exceptions when borrowing connections from the `ConnectionPool`. ==== Bugs diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index bca2932d7a..0226d32e18 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -40,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -50,6 +51,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.IntStream; /** * A {@code Client} is constructed from a {@link Cluster} and represents a way to send messages to Gremlin Server. @@ -68,6 +70,8 @@ public abstract class Client { protected volatile boolean initialized; protected final Client.Settings settings; + private static final Random random = new Random(); + Client(final Cluster cluster, final Client.Settings settings) { this.cluster = cluster; this.settings = settings; @@ -423,7 +427,7 @@ public abstract class Client { */ public final static class ClusteredClient extends Client { - protected ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>(); + ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>(); private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null); private Throwable initializationFailure = null; @@ -492,7 +496,11 @@ public abstract class Client { protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException { final Iterator<Host> possibleHosts; if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) { - // TODO: not sure what should be done if unavailable - select new host and re-submit traversal? + // looking at this code about putting the Host on the RequestMessage in light of 3.5.4, not sure + // this is being used as intended here. server side usage is to place the channel.remoteAddress + // in this token in the status metadata for the response. can't remember why it is being used this + // way here exactly. created TINKERPOP-2821 to examine this more carefully to clean this up in a + // future version. final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST); msg.getArgs().remove(Tokens.ARGS_HOST); possibleHosts = IteratorUtils.of(host); @@ -500,16 +508,19 @@ public abstract class Client { possibleHosts = this.cluster.loadBalancingStrategy().select(msg); } - // you can get no possible hosts in more than a few situations. perhaps the servers are just all down. - // or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server). - if (!possibleHosts.hasNext()) - throwNoHostAvailableException(); - - final Host bestHost = possibleHosts.next(); + // try a random host if none are marked available. maybe it will reconnect in the meantime. better than + // going straight to a fast NoHostAvailableException as was the case in versions 3.5.4 and earlier + final Host bestHost = possibleHosts.hasNext() ? possibleHosts.next() : chooseRandomHost(); final ConnectionPool pool = hostConnectionPools.get(bestHost); return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); } + private Host chooseRandomHost() { + final List<Host> hosts = new ArrayList<>(cluster.allHosts()); + final int ix = random.nextInt(hosts.size()); + return hosts.get(ix); + } + /** * Initializes the connection pools on all hosts. */ diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 81dd440891..0dfe0dbab3 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -114,11 +114,8 @@ final class Connection { channel = b.connect(uri.getHost(), uri.getPort()).sync().channel(); channelizer.connected(); - /* Configure behaviour on close of this channel. - * - * This callback would trigger the workflow to destroy this connection, so that a new request doesn't pick - * this closed connection. - */ + // Configure behaviour on close of this channel. This callback would trigger the workflow to destroy this + // connection, so that a new request doesn't pick this closed connection. final Connection thisConnection = this; channel.closeFuture().addListener((ChannelFutureListener) future -> { logger.debug("OnChannelClose callback called for channel {}", channel); @@ -138,7 +135,7 @@ final class Connection { logger.info("Created new connection for {}", uri); } catch (Exception ex) { - throw new ConnectionException(uri, "Could not open " + this.getConnectionInfo(true), ex); + throw new ConnectionException(uri, "Could not open " + getConnectionInfo(true), ex); } } @@ -316,6 +313,9 @@ final class Connection { RequestMessage.build(Tokens.OPS_CLOSE).addArg(Tokens.ARGS_FORCE, forceClose)).create(); final CompletableFuture<ResultSet> closed = new CompletableFuture<>(); + + // TINKERPOP-2822 should investigate this write more carefully to check for sensible behavior + // in the event the Channel was not created but we try to send the close message write(closeMessage, closed); try { @@ -336,27 +336,37 @@ final class Connection { } } - channelizer.close(channel); + // take a defensive posture here in the event the channelizer didn't get initialized somehow and a + // close() on the Connection is still called + if (channelizer != null) + channelizer.close(channel); + + // seems possible that the channelizer could initialize but fail to produce a channel, so worth checking + // null before proceeding here + if (channel != null) { + final ChannelPromise promise = channel.newPromise(); + promise.addListener(f -> { + if (f.cause() != null) { + future.completeExceptionally(f.cause()); + } else { + if (logger.isDebugEnabled()) + logger.debug("{} destroyed successfully.", connectionInfo); - final ChannelPromise promise = channel.newPromise(); - promise.addListener(f -> { - if (f.cause() != null) { - future.completeExceptionally(f.cause()); - } else { - if (logger.isDebugEnabled()) - logger.debug("{} destroyed successfully.", connectionInfo); + future.complete(null); + } + }); - future.complete(null); + // close the netty channel, if not already closed + if (!channel.closeFuture().isDone()) { + channel.close(promise); + } else { + if (!promise.trySuccess()) { + logger.warn("Failed to mark a promise as success because it is done already: {}", promise); + } } - }); - - // close the netty channel, if not already closed - if (!channel.closeFuture().isDone()) { - channel.close(promise); } else { - if (!promise.trySuccess()) { - logger.warn("Failed to mark a promise as success because it is done already: {}", promise); - } + logger.debug("Connection {} is shutting down but the channel was not initialized to begin with", + getConnectionInfo()); } } } @@ -365,7 +375,7 @@ final class Connection { * Gets a message that describes the state of the connection. */ public String getConnectionInfo() { - return getConnectionInfo(true); + return this.getConnectionInfo(true); } /** @@ -375,10 +385,10 @@ final class Connection { */ public String getConnectionInfo(final boolean showHost) { return showHost ? - String.format("Connection{channel=%s, host=%s, isDead=%s, borrowed=%s, pending=%s}", - getChannelId(), pool.host, isDead(), borrowed, pending.size()) : - String.format("Connection{channel=%s, isDead=%s, borrowed=%s, pending=%s}", - channel.id().asShortText(), isDead(), borrowed, pending.size()); + String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}", + getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing()) : + String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s}", + getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing()); } /** diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java new file mode 100644 index 0000000000..0771402c70 --- /dev/null +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; + +/** + * A factory that is responsible for creating a {@link Connection} instance. The {@link DefaultConnectionFactory} + * simply news up a {@code Connection} using its default constructor. This interface is mostly present to help + * enable better testing of the driver internals and likely shouldn't be used otherwise. + */ +interface ConnectionFactory { + + /** + * Create a connection for the specified {@link ConnectionPool}. + */ + public default Connection create(final ConnectionPool pool) throws ConnectionException { + return new Connection(pool.host.getHostUri(), pool, pool.settings().maxInProcessPerConnection); + } + + /** + * Default implementation. + */ + public static class DefaultConnectionFactory implements ConnectionFactory { } +} diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index b1a9632738..4e5f019273 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -67,12 +67,12 @@ final class ConnectionPool { private final String poolLabel; private final AtomicInteger scheduledForCreation = new AtomicInteger(); - private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>(); private volatile int waiter = 0; private final Lock waitLock = new ReentrantLock(true); private final Condition hasAvailableConnection = waitLock.newCondition(); + ConnectionFactory connectionFactory; public ConnectionPool(final Host host, final Client client) { this(host, client, Optional.empty(), Optional.empty()); @@ -80,9 +80,15 @@ final class ConnectionPool { public ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize, final Optional<Integer> overrideMaxPoolSize) { + this(host, client, overrideMinPoolSize, overrideMaxPoolSize, new ConnectionFactory.DefaultConnectionFactory()); + } + + ConnectionPool(final Host host, final Client client, final Optional<Integer> overrideMinPoolSize, + final Optional<Integer> overrideMaxPoolSize, final ConnectionFactory connectionFactory) { this.host = host; this.client = client; this.cluster = client.cluster; + this.connectionFactory = connectionFactory; poolLabel = "Connection Pool {host=" + host + "}"; final Settings.ConnectionPoolSettings settings = settings(); @@ -100,7 +106,7 @@ final class ConnectionPool { for (int i = 0; i < minPoolSize; i++) { connectionCreationFutures.add(CompletableFuture.runAsync(() -> { try { - this.connections.add(new Connection(host.getHostUri(), this, settings.maxInProcessPerConnection)); + this.connections.add(connectionFactory.create(this)); this.open.incrementAndGet(); } catch (ConnectionException e) { throw new CompletionException(e); @@ -110,7 +116,7 @@ final class ConnectionPool { CompletableFuture.allOf(connectionCreationFutures.toArray(new CompletableFuture[0])).join(); } catch (CancellationException ce) { - logger.warn("Initialization of connections cancelled for {}", getPoolInfo(), ce); + logger.warn("Initialization of connections cancelled for {}", this.getPoolInfo(), ce); throw ce; } catch (CompletionException ce) { // Some connections might have been initialized. Close the connection pool gracefully to close them. @@ -312,20 +318,31 @@ final class ConnectionPool { private void newConnection() { cluster.connectionScheduler().submit(() -> { - addConnectionIfUnderMaximum(); + // seems like this should be decremented first because if addConnectionIfUnderMaximum fails there is + // nothing that wants to decrement this number and so it leaves things in a state where you could + // newConnection() doesn't seem to get called at all because it believes connections are being currently + // created. this seems to lead to situations where the client can never borrow a connection and it's as + // though it can't reconnect at all. this was hard to test but it seemed to happen regularly after + // introduced the ConnectionFactory that enabled a way to introduce connection jitters (i.e. failures in + // creation of a Connection) at which point it seemed to happen with some regularity. scheduledForCreation.decrementAndGet(); + addConnectionIfUnderMaximum(); return null; }); } private boolean addConnectionIfUnderMaximum() { + final int openCountToActOn; + while (true) { - int opened = open.get(); + final int opened = open.get(); if (opened >= maxPoolSize) return false; - if (open.compareAndSet(opened, opened + 1)) + if (open.compareAndSet(opened, opened + 1)) { + openCountToActOn = opened; break; + } } if (isClosed()) { @@ -334,10 +351,13 @@ final class ConnectionPool { } try { - connections.add(new Connection(host.getHostUri(), this, settings().maxInProcessPerConnection)); - } catch (ConnectionException ce) { - logger.error("Connections were under max, but there was an error creating the connection.", ce); + connections.add(connectionFactory.create(this)); + } catch (Exception ce) { open.decrementAndGet(); + logger.error(String.format( + "Connections[%s] were under maximum allowed[%s], but there was an error creating a new connection", + openCountToActOn, maxPoolSize), + ce); considerHostUnavailable(); return false; } @@ -432,20 +452,33 @@ final class ConnectionPool { // if we timeout borrowing a connection that might mean the host is dead (or the timeout was super short). // either way supply a function to reconnect + final TimeoutException timeoutException = new TimeoutException(timeoutErrorMessage); this.considerHostUnavailable(); - throw new TimeoutException(timeoutErrorMessage); + throw timeoutException; } + /** + * On a failure to get a {@link Connection} this method is called to determine if the {@link Host} should be + * marked as unavailable and to establish a background reconnect operation. + */ public void considerHostUnavailable() { - // called when a connection is "dead" due to a non-recoverable error. - host.makeUnavailable(this::tryReconnect); - - // if the host is unavailable then we should release the connections - connections.forEach(this::definitelyDestroyConnection); - - // let the load-balancer know that the host is acting poorly - this.cluster.loadBalancingStrategy().onUnavailable(host); + // if there is at least one available connection the host has to still be around (or is perhaps on its way out + // but we'll stay optimistic in this check). no connections also means "unhealthy". unsure if there is an ok + // "no connections" state we can get into. in any event if there are no connections then we'd just try to + // immediately reconnect below anyway so perhaps that state isn't really something to worry about. + final boolean maybeUnhealthy = connections.stream().allMatch(Connection::isDead); + if (maybeUnhealthy) { + // immediately fire off an attempt to reconnect because there are no active connections. + host.tryReconnectingImmediately(this::tryReconnect); + + // let the load-balancer know that the host is acting poorly + if (!host.isAvailable()) { + // if the host is unavailable then we should release the connections + connections.forEach(this::definitelyDestroyConnection); + this.cluster.loadBalancingStrategy().onUnavailable(host); + } + } } /** @@ -457,7 +490,13 @@ final class ConnectionPool { Connection connection = null; try { - connection = borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); + // rather than rely on borrowConnection() infrastructure and the pool create a brand new Connection + // instance solely for the purpose of this ping. this ensures that if the pool is overloaded that we + // make an honest attempt at validating host health without failing over some timeout waiting for a + // connection in the pool. not sure if we should try to keep this connection if it succeeds and if the + // pool needs it. for now that seems like an unnecessary added bit of complexity for dealing with this + // error state + connection = connectionFactory.create(this); final RequestMessage ping = client.buildMessage(cluster.validationRequest()).create(); final CompletableFuture<ResultSet> f = new CompletableFuture<>(); connection.write(ping, f); @@ -467,9 +506,13 @@ final class ConnectionPool { this.cluster.loadBalancingStrategy().onAvailable(h); return true; } catch (Exception ex) { - logger.debug("Failed reconnect attempt on {}", h, ex); - if (connection != null) definitelyDestroyConnection(connection); + logger.error(String.format("Failed reconnect attempt on %s%s%s", + h, System.lineSeparator(), this.getPoolInfo()), ex); return false; + } finally { + if (connection != null) { + connection.closeAsync(); + } } } @@ -547,7 +590,7 @@ final class ConnectionPool { */ public String getPoolInfo(final Connection connectionToCallout) { final StringBuilder sb = new StringBuilder("ConnectionPool ("); - sb.append(host); + sb.append(host.toString()); sb.append(")"); if (connections.isEmpty()) { @@ -555,25 +598,35 @@ final class ConnectionPool { } else { final int connectionCount = connections.size(); sb.append(System.lineSeparator()); - sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s)", connectionCount, maxPoolSize, minPoolSize)); + sb.append(String.format("Connection Pool Status (size=%s max=%s min=%s toCreate=%s markedOpen=%s bin=%s)", + connectionCount, maxPoolSize, minPoolSize, this.scheduledForCreation.get(), this.open.get(), bin.size())); + sb.append(System.lineSeparator()); + + appendConnections(sb, connectionToCallout, connections); + sb.append(System.lineSeparator()); + sb.append("-- bin --"); sb.append(System.lineSeparator()); + appendConnections(sb, connectionToCallout, new ArrayList<>(bin)); + } - for (int ix = 0; ix < connectionCount; ix++) { - final Connection c = connections.get(ix); + return sb.toString().trim(); + } - if (c == connectionToCallout) - sb.append("==> "); - else - sb.append("> "); + private void appendConnections(final StringBuilder sb, final Connection connectionToCallout, + final List<Connection> connections) { + final int connectionCount = connections.size(); + for (int ix = 0; ix < connectionCount; ix++) { + final Connection c = connections.get(ix); + if (c.equals(connectionToCallout)) + sb.append("==> "); + else + sb.append("> "); - sb.append(c.getConnectionInfo(false)); + sb.append(c.getConnectionInfo(false)); - if (ix < connectionCount - 1) - sb.append(System.lineSeparator()); - } + if (ix < connectionCount - 1) + sb.append(System.lineSeparator()); } - - return sb.toString().trim(); } @Override diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java index fb0a9e333a..3d492fa612 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Host.java @@ -26,6 +26,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -85,6 +86,23 @@ public final class Host { } } + void tryReconnectingImmediately(final Function<Host, Boolean> reconnect) { + // only do a connection re-attempt if one is not already in progress + if (retryInProgress.compareAndSet(Boolean.FALSE, Boolean.TRUE)) { + retryThread = this.cluster.hostScheduler().scheduleAtFixedRate(() -> { + logger.debug("Trying to reconnect to host at {}", this); + final boolean reconnected = reconnect.apply(this); + if (reconnected) + reconnected(); + else { + logger.warn("Marking {} as unavailable. Trying to reconnect.", this); + isAvailable = false; + } + }, 0, + cluster.connectionPoolSettings().reconnectInterval, TimeUnit.MILLISECONDS); + } + } + private void reconnected() { // race condition! retry boolean could be set to false, a new retryThread created above // and then cancelled here. But we're only executing this at all because we *have* reconnected diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java index 1d2a70f7ea..9ccea2dffa 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/exception/NoHostAvailableException.java @@ -21,10 +21,14 @@ package org.apache.tinkerpop.gremlin.driver.exception; public class NoHostAvailableException extends RuntimeException { public NoHostAvailableException() { - super("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason."); + this("All hosts are considered unavailable due to previous exceptions. Check the error log to find the actual reason."); } - public NoHostAvailableException(Throwable ex) { + public NoHostAvailableException(final String message) { + super(message); + } + + public NoHostAvailableException(final Throwable ex) { super(ex); } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java index 8ebad7809e..1417409c47 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java @@ -21,6 +21,8 @@ package org.apache.tinkerpop.gremlin.driver; import io.netty.handler.codec.CorruptedFrameException; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; +import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest; import org.apache.tinkerpop.gremlin.server.TestClientFactory; @@ -28,13 +30,25 @@ import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLHandshakeException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.number.OrderingComparison.greaterThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrationTest { + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ClientConnectionIntegrateTest.class); private Log4jRecordingAppender recordingAppender = null; private Level previousLogLevel; @@ -110,4 +124,98 @@ public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrat assertThat(recordingAppender.logContainsAny("^(?!.*(isDead=false)).*isDead=true.*destroyed successfully.$"), is(true)); } + + /** + * Added for TINKERPOP-2813 - this scenario would have previously thrown tons of + * {@link NoHostAvailableException}. + */ + @Test + public void shouldSucceedWithJitteryConnection() throws Exception { + final Cluster cluster = TestClientFactory.build().minConnectionPoolSize(1).maxConnectionPoolSize(4). + reconnectInterval(1000). + maxWaitForConnection(4000).validationRequest("g.inject()").create(); + final Client.ClusteredClient client = cluster.connect(); + + client.init(); + + // every 10 connections let's have some problems + final JitteryConnectionFactory connectionFactory = new JitteryConnectionFactory(3); + client.hostConnectionPools.forEach((h, pool) -> pool.connectionFactory = connectionFactory); + + // get an initial connection which marks the host as available + assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); + + // network is gonna get fishy - ConnectionPool should try to grow during the workload below and when it + // does some connections will fail to create in the background which should log some errors but not tank + // the submit() as connections that are currently still working and active should be able to handle the load. + connectionFactory.jittery = true; + + // load up a hella ton of requests + final int requests = 1000; + final CountDownLatch latch = new CountDownLatch(requests); + final AtomicBoolean hadFailOtherThanTimeout = new AtomicBoolean(false); + + new Thread(() -> { + IntStream.range(0, requests).forEach(i -> { + try { + client.submitAsync("1 + " + i); + } catch (Exception ex) { + // we could catch a TimeoutException here in some cases if the jitters cause a borrow of a + // connection to take too long. submitAsync() will wrap in a RuntimeException. can't assert + // this condition inside this thread or the test locks up + hadFailOtherThanTimeout.compareAndSet(false, !(ex.getCause() instanceof TimeoutException)); + } finally { + latch.countDown(); + } + }); + }, "worker-shouldSucceedWithJitteryConnection").start(); + + // wait long enough for the jitters to kick in at least a little + while (latch.getCount() > 500) { + TimeUnit.MILLISECONDS.sleep(50); + } + + // wait for requests to complete + assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)); + + // make sure we had some failures for sure coming out the factory + assertThat(connectionFactory.getNumberOfFailures(), is(greaterThan(0L))); + + // if there was a exception in the worker thread, then it had better be a TimeoutException + assertThat(hadFailOtherThanTimeout.get(), is(false)); + + cluster.close(); + } + + /** + * Introduces random failures when creating a {@link Connection} for the {@link ConnectionPool}. + */ + public static class JitteryConnectionFactory implements ConnectionFactory { + + private volatile boolean jittery = false; + private final AtomicLong connectionsCreated = new AtomicLong(0); + private final AtomicLong connectionFailures = new AtomicLong(0); + private final int numberOfConnectionsBetweenErrors; + + public JitteryConnectionFactory(final int numberOfConnectionsBetweenErrors) { + this.numberOfConnectionsBetweenErrors = numberOfConnectionsBetweenErrors; + } + + public long getNumberOfFailures() { + return connectionFailures.get(); + } + + @Override + public Connection create(final ConnectionPool pool) { + + // fail creating a connection every 10 attempts or so when jittery + if (jittery && connectionsCreated.incrementAndGet() % numberOfConnectionsBetweenErrors == 0) { + connectionFailures.incrementAndGet(); + throw new ConnectionException(pool.host.getHostUri(), + new SSLHandshakeException("SSL on the funk - server is big mad")); + } + + return ConnectionFactory.super.create(pool); + } + } } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java index 2b66ce1de1..6a8acbad15 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/AbstractGremlinServerIntegrationTest.java @@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.server; import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer; import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizerIntegrateTest; import org.apache.tinkerpop.gremlin.server.op.OpLoader; +import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor; import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph; import org.junit.After; import org.junit.Before; @@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.InputStream; +import java.util.concurrent.CompletableFuture; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assume.assumeThat; @@ -117,6 +119,10 @@ public abstract class AbstractGremlinServerIntegrationTest { } public void startServer() throws Exception { + startServerAsync().join(); + } + + public CompletableFuture<ServerGremlinExecutor> startServerAsync() throws Exception { final InputStream stream = getSettingsInputStream(); final Settings settings = Settings.read(stream); overriddenSettings = overrideSettings(settings); @@ -131,7 +137,7 @@ public abstract class AbstractGremlinServerIntegrationTest { this.server = new GremlinServer(overriddenSettings); - server.start().join(); + return server.start(); } @After diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index e44660ee50..43a53bae7a 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -55,7 +55,6 @@ import org.apache.tinkerpop.gremlin.util.function.FunctionUtils; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; @@ -63,7 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.awt.Color; -import java.io.File; import java.net.ConnectException; import java.time.Instant; import java.util.ArrayList;
