This is an automated email from the ASF dual-hosted git repository. valentyn pushed a commit to branch valentyn/connection-pooling in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit 1c726292d88147e30c853c6e48af07a4deb59872 Author: Valentyn Kahamlyk <[email protected]> AuthorDate: Tue Apr 30 13:29:12 2024 -0700 connection pooling clean up --- .../tinkerpop/gremlin/driver/Channelizer.java | 9 +- .../apache/tinkerpop/gremlin/driver/Client.java | 292 +------ .../apache/tinkerpop/gremlin/driver/Cluster.java | 34 +- .../tinkerpop/gremlin/driver/Connection.java | 49 +- .../tinkerpop/gremlin/driver/ConnectionPool.java | 43 +- .../driver/handler/GremlinResponseHandler.java | 31 +- .../driver/handler/HttpGremlinRequestEncoder.java | 10 - .../driver/remote/DriverRemoteConnection.java | 17 +- .../gremlin/driver/ConnectionPoolTest.java | 99 +++ .../apache/tinkerpop/gremlin/driver/HostTest.java | 4 +- .../WebSocketClientBehaviorIntegrateTest.java | 956 ++++++++++----------- .../driver/remote/DriverRemoteConnectionTest.java | 2 - .../driver/ClientConnectionIntegrateTest.java | 1 - .../gremlin/server/GremlinDriverIntegrateTest.java | 300 +------ .../gremlin/server/HttpDriverIntegrateTest.java | 126 ++- 15 files changed, 705 insertions(+), 1268 deletions(-) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 31c34d6a6e..a8cb6d69be 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java @@ -32,8 +32,7 @@ import org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinResponseStreamDeco import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4; import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; /** * Client-side channel initializer interface. It is responsible for constructing the Netty {@code ChannelPipeline} @@ -76,7 +75,7 @@ public interface Channelizer extends ChannelHandler { abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> implements Channelizer { protected Connection connection; protected Cluster cluster; - private ConcurrentMap<UUID, ResultQueue> pending; + private AtomicReference<ResultQueue> pending; protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler"; public static final String PIPELINE_SSL_HANDLER = "gremlin-ssl-handler"; @@ -149,10 +148,6 @@ public interface Channelizer extends ChannelHandler { public void init(final Connection connection) { super.init(connection); - // server does not support sessions so this channerlizer can't support the SessionedClient - if (connection.getClient() instanceof Client.SessionedClient) - throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", HttpChannelizer.class.getSimpleName())); - gremlinRequestEncoder = new HttpGremlinRequestEncoder(cluster.getSerializer(), cluster.getRequestInterceptor(), cluster.isUserAgentOnConnectEnabled()); gremlinResponseDecoder = new HttpGremlinResponseStreamDecoder((MessageTextSerializerV4<?>) cluster.getSerializer()); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java index f47d529d18..93e1c9d70e 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java @@ -19,7 +19,6 @@ package org.apache.tinkerpop.gremlin.driver; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.tinkerpop.gremlin.util.Tokens; import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,19 +29,15 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import javax.net.ssl.SSLException; import java.net.ConnectException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Random; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -52,7 +47,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; -import java.util.stream.IntStream; /** * A {@code Client} is constructed from a {@link Cluster} and represents a way to send messages to Gremlin Server. @@ -71,13 +65,11 @@ public abstract class Client { protected final Cluster cluster; protected volatile boolean initialized; - protected final Client.Settings settings; private static final Random random = new Random(); - Client(final Cluster cluster, final Client.Settings settings) { + Client(final Cluster cluster) { this.cluster = cluster; - this.settings = settings; } /** @@ -120,7 +112,7 @@ public abstract class Client { * the created {@code Client}. */ public Client alias(final Map<String, String> aliases) { - return new AliasClusteredClient(this, aliases, settings); + return new AliasClusteredClient(this, aliases); } /** @@ -402,13 +394,6 @@ public abstract class Client { closeAsync().join(); } - /** - * Gets the {@link Client.Settings}. - */ - public Settings getSettings() { - return settings; - } - /** * Gets the {@link Cluster} that spawned this {@code Client}. */ @@ -433,8 +418,8 @@ public abstract class Client { private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null); private Throwable initializationFailure = null; - ClusteredClient(final Cluster cluster, final Client.Settings settings) { - super(cluster, settings); + ClusteredClient(final Cluster cluster) { + super(cluster); } @Override @@ -487,7 +472,7 @@ public abstract class Client { */ @Override public Client alias(final Map<String, String> aliases) { - return new AliasClusteredClient(this, aliases, settings); + return new AliasClusteredClient(this, aliases); } /** @@ -496,19 +481,7 @@ public abstract class Client { */ @Override protected Connection chooseConnection(final RequestMessageV4 msg) throws TimeoutException, ConnectionException { - final Iterator<Host> possibleHosts; -// if (msg.optionalArgs(Tokens.ARGS_HOST).isPresent()) { -// // looking at this code about putting the Host on the RequestMessage in light of 3.5.4, not sure -// // this is being used as intended here. server side usage is to place the channel.remoteAddress -// // in this token in the status metadata for the response. can't remember why it is being used this -// // way here exactly. created TINKERPOP-2821 to examine this more carefully to clean this up in a -// // future version. -// final Host host = (Host) msg.getArgs().get(Tokens.ARGS_HOST); -// msg.getArgs().remove(Tokens.ARGS_HOST); -// possibleHosts = IteratorUtils.of(host); -// } else { - possibleHosts = this.cluster.loadBalancingStrategy().select(msg); -// } + final Iterator<Host> possibleHosts = this.cluster.loadBalancingStrategy().select(msg); // try a random host if none are marked available. maybe it will reconnect in the meantime. better than // going straight to a fast NoHostAvailableException as was the case in versions 3.5.4 and earlier @@ -634,8 +607,8 @@ public abstract class Client { private final Map<String, String> aliases = new HashMap<>(); final CompletableFuture<Void> close = new CompletableFuture<>(); - AliasClusteredClient(final Client client, final Map<String, String> aliases, final Client.Settings settings) { - super(client.cluster, settings); + AliasClusteredClient(final Client client, final Map<String, String> aliases) { + super(client.cluster); this.client = client; this.aliases.putAll(aliases); } @@ -655,7 +628,6 @@ public abstract class Client { // apply settings if they were made available options.getBatchSize().ifPresent(batchSize -> request.addChunkSize(batchSize)); options.getTimeout().ifPresent(timeout -> request.addTimeoutMillis(timeout)); -// options.getOverrideRequestId().ifPresent(request::overrideRequestId); // options.getUserAgent().ifPresent(userAgent -> request.add(Tokens.ARGS_USER_AGENT, userAgent)); options.getMaterializeProperties().ifPresent(mp -> request.addMaterializeProperties(mp)); @@ -751,253 +723,7 @@ public abstract class Client { @Override public Client alias(final Map<String, String> aliases) { if (close.isDone()) throw new IllegalStateException("Client is closed"); - return new AliasClusteredClient(client, aliases, settings); - } - } - - /** - * A {@code Client} implementation that operates in the context of a session. Requests are sent to a single - * server, where each request is bound to the same thread with the same set of bindings across requests. - * Transaction are not automatically committed. It is up the client to issue commit/rollback commands. - */ - public final static class SessionedClient extends Client { - private final String sessionId; - private final boolean manageTransactions; - - private ConnectionPool connectionPool; - - private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null); - - SessionedClient(final Cluster cluster, final Client.Settings settings) { - super(cluster, settings); - this.sessionId = settings.getSession().get().sessionId; - this.manageTransactions = settings.getSession().get().manageTransactions; - } - - /** - * Returns the session identifier bound to this {@code Client}. - */ - public String getSessionId() { - return sessionId; - } - - /** - * todo. - */ - @Override - public RequestMessageV4.Builder buildMessage(final RequestMessageV4.Builder builder) { -// TODO: replace this with new Transaction API later. -// builder.processor("session"); -// builder.addArg(Tokens.ARGS_SESSION, sessionId); -// builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions); - return builder; - } - - /** - * Since the session is bound to a single host, simply borrow a connection from that pool. - */ - @Override - protected Connection chooseConnection(final RequestMessageV4 msg) throws TimeoutException, ConnectionException { - return connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); - } - - /** - * Randomly choose an available {@link Host} to bind the session too and initialize the {@link ConnectionPool}. - */ - @Override - protected void initializeImplementation() { - // chooses a host at random from all hosts - if (cluster.allHosts().isEmpty()) { - throw new IllegalStateException("No available host in the cluster"); - } - - final List<Host> hosts = new ArrayList<>(cluster.allHosts()); - Collections.shuffle(hosts); - // if a host has been marked as available, use it instead - Optional<Host> host = hosts.stream().filter(Host::isAvailable).findFirst(); - final Host selectedHost = host.orElse(hosts.get(0)); - - // only mark host as available if we can initialize the connection pool successfully - try { - connectionPool = new ConnectionPool(selectedHost, this, Optional.of(1), Optional.of(1)); - selectedHost.makeAvailable(); - } catch (RuntimeException ex) { - logger.error("Could not initialize client for {}", host, ex); - throw new NoHostAvailableException(ex); - } - } - - @Override - public boolean isClosing() { - return closing.get() != null; - } - - /** - * Close the bound {@link ConnectionPool}. - */ - @Override - public synchronized CompletableFuture<Void> closeAsync() { - if (closing.get() != null) - return closing.get(); - - // the connection pool may not have been initialized if requests weren't sent across it. in those cases - // we just need to return a pre-completed future - final CompletableFuture<Void> connectionPoolClose = null == connectionPool ? - CompletableFuture.completedFuture(null) : connectionPool.closeAsync(); - closing.set(connectionPoolClose); - return connectionPoolClose; - } - } - - /** - * Settings given to {@link Cluster#connect(Client.Settings)} that configures how a {@link Client} will behave. - */ - public static class Settings { - private final Optional<SessionSettings> session; - - private Settings(final Builder builder) { - this.session = builder.session; - } - - public static Builder build() { - return new Builder(); - } - - /** - * Determines if the {@link Client} is to be constructed with a session. If the value is present, then a - * session is expected. - */ - public Optional<SessionSettings> getSession() { - return session; - } - - public static class Builder { - private Optional<SessionSettings> session = Optional.empty(); - - private Builder() { - } - - /** - * Enables a session. By default this will create a random session name and configure transactions to be - * unmanaged. This method will override settings provided by calls to the other overloads of - * {@code useSession}. - */ - public Builder useSession(final boolean enabled) { - session = enabled ? Optional.of(SessionSettings.build().create()) : Optional.empty(); - return this; - } - - /** - * Enables a session. By default this will create a session with the provided name and configure - * transactions to be unmanaged. This method will override settings provided by calls to the other - * overloads of {@code useSession}. - */ - public Builder useSession(final String sessionId) { - session = sessionId != null && !sessionId.isEmpty() ? - Optional.of(SessionSettings.build().sessionId(sessionId).create()) : Optional.empty(); - return this; - } - - /** - * Enables a session. This method will override settings provided by calls to the other overloads of - * {@code useSession}. - */ - public Builder useSession(final SessionSettings settings) { - session = Optional.ofNullable(settings); - return this; - } - - public Settings create() { - return new Settings(this); - } - - } - } - - /** - * Settings for a {@link Client} that involve a session. - */ - public static class SessionSettings { - private final boolean manageTransactions; - private final String sessionId; - private final boolean forceClosed; - - private SessionSettings(final Builder builder) { - manageTransactions = builder.manageTransactions; - sessionId = builder.sessionId; - forceClosed = builder.forceClosed; - } - - /** - * If enabled, transactions will be "managed" such that each request will represent a complete transaction. - */ - public boolean manageTransactions() { - return manageTransactions; - } - - /** - * Provides the identifier of the session. - */ - public String getSessionId() { - return sessionId; - } - - /** - * Determines if the session will be force closed. See {@link Builder#forceClosed(boolean)} for more details - * on what that means. - */ - public boolean isForceClosed() { - return forceClosed; - } - - public static SessionSettings.Builder build() { - return new SessionSettings.Builder(); - } - - public static class Builder { - private boolean manageTransactions = false; - private String sessionId = UUID.randomUUID().toString(); - private boolean forceClosed = false; - - private Builder() { - } - - /** - * If enabled, transactions will be "managed" such that each request will represent a complete transaction. - * By default this value is {@code false}. - */ - public Builder manageTransactions(final boolean manage) { - manageTransactions = manage; - return this; - } - - /** - * Provides the identifier of the session. This value cannot be null or empty. By default it is set to - * a random {@code UUID}. - */ - public Builder sessionId(final String sessionId) { - if (null == sessionId || sessionId.isEmpty()) - throw new IllegalArgumentException("sessionId cannot be null or empty"); - this.sessionId = sessionId; - return this; - } - - /** - * Determines if the session should be force closed when the client is closed. Force closing will not - * attempt to close open transactions from existing running jobs and leave it to the underlying graph to - * decided how to proceed with those orphaned transactions. Setting this to {@code true} tends to lead to - * faster close operation which can be desirable if Gremlin Server has a long session timeout and a long - * script evaluation timeout as attempts to close long run jobs can occur more rapidly. By default, this - * value is {@code false}. - */ - public Builder forceClosed(final boolean forced) { - this.forceClosed = forced; - return this; - } - - public SessionSettings create() { - return new SessionSettings(this); - } + return new AliasClusteredClient(client, aliases); } } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index 407b4cefe5..aacacc6042 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java @@ -28,7 +28,6 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.util.concurrent.Future; import org.apache.commons.configuration2.Configuration; import org.apache.tinkerpop.gremlin.util.MessageSerializer; -import org.apache.tinkerpop.gremlin.util.Tokens; import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4; import org.apache.tinkerpop.gremlin.util.ser.Serializers; import io.netty.bootstrap.Bootstrap; @@ -92,23 +91,7 @@ public final class Cluster { } /** - * Creates a {@link Client.ClusteredClient} instance to this {@code Cluster}, meaning requests will be routed to - * one or more servers (depending on the cluster configuration), where each request represents the entirety of a - * transaction. A commit or rollback (in case of error) is automatically executed at the end of the request. - * <p/> - * Note that calling this method does not imply that a connection is made to the server itself at this point. - * Therefore, if there is only one server specified in the {@code Cluster} and that server is not available an - * error will not be raised at this point. Connections get initialized in the {@link Client} when a request is - * submitted or can be directly initialized via {@link Client#init()}. - */ - public <T extends Client> T connect() { - final Client client = new Client.ClusteredClient(this, Client.Settings.build().create()); - manager.trackClient(client); - return (T) client; - } - - /** - * Creates a {@link Client.SessionedClient} instance to this {@code Cluster}, meaning requests will be routed to + * Creates a SessionedClient instance to this {@code Cluster}, meaning requests will be routed to * a single server (randomly selected from the cluster), where the same bindings will be available on each request. * Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a * single request. The transactions are managed by the user and must be committed or rolled-back manually. @@ -121,11 +104,11 @@ public final class Cluster { * @param sessionId user supplied id for the session which should be unique (a UUID is ideal). */ public <T extends Client> T connect(final String sessionId) { - return connect(sessionId, false); + throw new UnsupportedOperationException("not implemented"); } /** - * Creates a {@link Client.SessionedClient} instance to this {@code Cluster}, meaning requests will be routed to + * Creates a SessionedClient instance to this {@code Cluster}, meaning requests will be routed to * a single server (randomly selected from the cluster), where the same bindings will be available on each request. * Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a * single request. If {@code manageTransactions} is set to {@code false} then transactions are managed by the @@ -141,19 +124,14 @@ public final class Cluster { * @param manageTransactions enables auto-transactions when set to true */ public <T extends Client> T connect(final String sessionId, final boolean manageTransactions) { - final Client.SessionSettings sessionSettings = Client.SessionSettings.build() - .manageTransactions(manageTransactions) - .sessionId(sessionId).create(); - final Client.Settings settings = Client.Settings.build().useSession(sessionSettings).create(); - return connect(settings); + throw new UnsupportedOperationException("not implemented"); } /** * Creates a new {@link Client} based on the settings provided. */ - public <T extends Client> T connect(final Client.Settings settings) { - final Client client = settings.getSession().isPresent() ? new Client.SessionedClient(this, settings) : - new Client.ClusteredClient(this, settings); + public <T extends Client> T connect() { + final Client client = new Client.ClusteredClient(this); manager.trackClient(client); return (T) client; } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index c8e5d2a6cb..8e0ae0f4cf 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java @@ -18,7 +18,6 @@ */ package org.apache.tinkerpop.gremlin.driver; -import org.apache.tinkerpop.gremlin.util.Tokens; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; @@ -31,17 +30,12 @@ import io.netty.channel.socket.nio.NioSocketChannel; import java.net.URI; import java.time.Instant; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** @@ -50,32 +44,28 @@ import java.util.concurrent.atomic.AtomicReference; * @author Stephen Mallette (http://stephen.genoprime.com) */ final class Connection { + public static final int MAX_WAIT_FOR_CONNECTION = 16000; + public static final int MAX_WAIT_FOR_CLOSE = 3000; + public static final int MAX_CONTENT_LENGTH = 10 * 1024 * 1024; + public static final int RECONNECT_INTERVAL = 1000; + public static final int RESULT_ITERATION_BATCH_SIZE = 64; + public static final long KEEP_ALIVE_INTERVAL = 180000; + public static final long CONNECTION_SETUP_TIMEOUT_MILLIS = 15000; private static final Logger logger = LoggerFactory.getLogger(Connection.class); private final Channel channel; private final URI uri; - private final ConcurrentMap<UUID, ResultQueue> pending = new ConcurrentHashMap<>(); + private final AtomicReference<ResultQueue> pending = new AtomicReference<>(); private final Cluster cluster; private final Client client; private final ConnectionPool pool; private final String creatingThread; private final String createdTimestamp; - public static final int MAX_WAIT_FOR_CONNECTION = 16000; - public static final int MAX_WAIT_FOR_CLOSE = 3000; - public static final int MAX_CONTENT_LENGTH = 10 * 1024 * 1024; - - public static final int RECONNECT_INTERVAL = 1000; - public static final int RESULT_ITERATION_BATCH_SIZE = 64; - public static final long KEEP_ALIVE_INTERVAL = 180000; - public final static long CONNECTION_SETUP_TIMEOUT_MILLIS = 15000; - /** - * When a {@code Connection} is borrowed from the pool, this number is incremented to indicate the number of - * times it has been taken and is decremented when it is returned. This number is one indication as to how - * busy a particular {@code Connection} is. + * Is a {@code Connection} borrowed from the pool. */ - public final AtomicInteger borrowed = new AtomicInteger(0); + private final AtomicBoolean isBorrowed = new AtomicBoolean(false); /** * This boolean guards the replace of the connection and ensures that it only occurs once. */ @@ -149,6 +139,10 @@ final class Connection { return (channel != null && !channel.isActive()); } + public AtomicBoolean isBorrowed() { + return isBorrowed; + } + boolean isClosing() { return closeFuture.get() != null; } @@ -165,7 +159,7 @@ final class Connection { return client; } - ConcurrentMap<UUID, ResultQueue> getPending() { + AtomicReference<ResultQueue> getPending() { return pending; } @@ -195,10 +189,6 @@ final class Connection { } public ChannelPromise write(final RequestMessageV4 requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) { - // dont allow the same request id to be used as one that is already in the queue - if (pending.containsKey(requestMessage.getRequestId())) - throw new IllegalStateException(String.format("There is already a request pending with an id of: %s", requestMessage.getRequestId())); - // once there is a completed write, then create a traverser for the result set and complete // the promise so that the client knows that that it can start checking for results. final Connection thisConnection = this; @@ -238,7 +228,8 @@ final class Connection { }, cluster.executor()); final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted); - pending.put(requestMessage.getRequestId(), handler); + // pending.put(requestMessage.getRequestId(), handler); + pending.set(handler); // resultQueueSetup should only be completed by a worker since the application code might have sync // completion stages attached to it which and we do not want the event loop threads to process those @@ -270,7 +261,7 @@ final class Connection { } private boolean isOkToClose() { - return pending.isEmpty() || (channel != null && !channel.isOpen()) || !pool.host.isAvailable(); + return pending.get() == null || (channel != null && !channel.isOpen()) || !pool.host.isAvailable(); } /** @@ -379,9 +370,9 @@ final class Connection { public String getConnectionInfo(final boolean showHost) { return showHost ? String.format("Connection{channel=%s host=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}", - getChannelId(), pool.host.toString(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread) : + getChannelId(), pool.host.toString(), isDead(), this.isBorrowed().get(), getPending().get() == null ? 0 : 1, this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread) : String.format("Connection{channel=%s isDead=%s borrowed=%s pending=%s markedReplaced=%s closing=%s created=%s thread=%s}", - getChannelId(), isDead(), this.borrowed.get(), getPending().size(), this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread); + getChannelId(), isDead(), this.isBorrowed().get(), getPending().get() == null ? 0 : 1, this.isBeingReplaced, isClosing(), createdTimestamp, creatingThread); } /** diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java index 37ad1ac936..07e2d01dfa 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ConnectionPool.java @@ -183,7 +183,7 @@ final class ConnectionPool { } // Get the least used valid connection - final Connection leastUsedConn = getLeastUsedValidConnection(); + final Connection leastUsedConn = getAvailableConnection(); if (null == leastUsedConn) { if (isClosed()) @@ -201,13 +201,13 @@ final class ConnectionPool { logger.debug("Attempting to return {} on {}", connection, host); if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown"); - final int borrowed = connection.borrowed.decrementAndGet(); + connection.isBorrowed().set(false); if (connection.isDead()) { logger.debug("Marking {} as dead", this.host); this.replaceConnection(connection); } else { - if (bin.contains(connection) && borrowed == 0) { + if (bin.contains(connection)) { logger.debug("{} is already in the bin and it has no inflight requests so it is safe to close", connection); if (bin.remove(connection)) connection.closeAsync(); @@ -215,9 +215,9 @@ final class ConnectionPool { } final int poolSize = connections.size(); - if (poolSize > minPoolSize ) { + if (poolSize > minPoolSize) { if (logger.isDebugEnabled()) - logger.debug("destroy {}",connection.getConnectionInfo()); + logger.debug("destroy {}", connection.getConnectionInfo()); destroyConnection(connection); } else if (maxPoolSize > 1) { if (logger.isDebugEnabled()) @@ -397,7 +397,7 @@ final class ConnectionPool { } // only close the connection for good once it is done being borrowed or when it is dead - if (connection.isDead() || connection.borrowed.get() == 0) { + if (connection.isDead() || !connection.isBorrowed().get()) { if (bin.remove(connection)) { final CompletableFuture<Void> closeFuture = connection.closeAsync(); closeFuture.whenComplete((v, t) -> { @@ -422,7 +422,7 @@ final class ConnectionPool { if (isClosed()) throw new ConnectionException(host.getHostUri(), host.getAddress(), "Pool is shutdown"); - final Connection leastUsed = getLeastUsedValidConnection(); + final Connection leastUsed = getAvailableConnection(); if (leastUsed != null) { if (logger.isDebugEnabled()) @@ -547,30 +547,21 @@ final class ConnectionPool { * @return The least-used connection from the pool. Returns null if no valid connection could be retrieved from the * pool. */ - private synchronized Connection getLeastUsedValidConnection() { - // todo: just take first unused - int minInFlight = Integer.MAX_VALUE; + private synchronized Connection getAvailableConnection() { Connection leastBusy = null; + int availableConnectionsCount = 0; for (Connection connection : connections) { - final int inFlight = connection.borrowed.get(); - if (!connection.isDead() && inFlight < minInFlight && inFlight < 1) { - minInFlight = inFlight; - leastBusy = connection; + if (!connection.isDead() && !connection.isBorrowed().get()) { + // try to borrow connection + if (leastBusy == null && connection.isBorrowed().compareAndSet(false, true)) { + leastBusy = connection; + } + availableConnectionsCount++; } } - if (leastBusy != null) { - // Increment borrow count and consider making a new connection if least used connection hits usage maximum - if (leastBusy.borrowed.incrementAndGet() > 0 - && connections.size() < maxPoolSize) { - if (logger.isDebugEnabled()) - logger.debug("Least used {} on {} reached maxSimultaneousUsagePerConnection but pool size {} < maxPoolSize - consider new connection", - leastBusy.getConnectionInfo(), host, connections.size()); - considerNewConnection(); - } - } else if (connections.size() < maxPoolSize) { - // A safeguard for scenarios where consideration of a new connection was somehow not triggered by an - // existing connection hitting the usage maximum + // todo: may be add new connection when only 10-20% of pool is available? + if (availableConnectionsCount == 0 && connections.size() < maxPoolSize) { considerNewConnection(); } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java index 74a29cfa67..7c516de775 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java @@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.driver.handler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.util.AttributeMap; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultQueue; @@ -36,10 +35,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentMap; - -import static org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEncoder.REQUEST_ID; +import java.util.concurrent.atomic.AtomicReference; /** * Takes a map of requests pending responses and writes responses to the {@link ResultQueue} of a request @@ -47,9 +43,9 @@ import static org.apache.tinkerpop.gremlin.driver.handler.HttpGremlinRequestEnco */ public class GremlinResponseHandler extends SimpleChannelInboundHandler<ResponseMessage> { private static final Logger logger = LoggerFactory.getLogger(GremlinResponseHandler.class); - private final ConcurrentMap<UUID, ResultQueue> pending; + private final AtomicReference<ResultQueue> pending; - public GremlinResponseHandler(final ConcurrentMap<UUID, ResultQueue> pending) { + public GremlinResponseHandler(final AtomicReference<ResultQueue> pending) { this.pending = pending; } @@ -59,18 +55,16 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // should fire off a close message which will properly release the driver. super.channelInactive(ctx); - // the channel isn't going to get anymore results as it is closed so release all pending requests - pending.values().forEach(val -> val.markError(new IllegalStateException("Connection to server is no longer active"))); - pending.clear(); + final ResultQueue current = pending.getAndSet(null); + if (current != null) { + current.markError(new IllegalStateException("Connection to server is no longer active")); + } } @Override protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final ResponseMessage response) throws Exception { - final UUID requestId = ((AttributeMap) channelHandlerContext).attr(REQUEST_ID).get(); - final HttpResponseStatus statusCode = response.getStatus() == null ? HttpResponseStatus.PARTIAL_CONTENT : response.getStatus().getCode(); - final ResultQueue queue = pending.get(requestId); - System.out.println("GremlinResponseHandler get requestId: " + requestId); + final ResultQueue queue = pending.get(); if (response.getResult().getData() != null) { System.out.println("GremlinResponseHandler payload size: " + ((List) response.getResult().getData()).size()); } @@ -104,7 +98,10 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // todo: // as this is a non-PARTIAL_CONTENT code - the stream is done. if (statusCode != HttpResponseStatus.PARTIAL_CONTENT) { - pending.remove(requestId).markComplete(response.getStatus().getAttributes()); + final ResultQueue current = pending.getAndSet(null); + if (current != null) { + current.markComplete(response.getStatus().getAttributes()); + } } System.out.println("----------------------------"); @@ -117,9 +114,7 @@ public class GremlinResponseHandler extends SimpleChannelInboundHandler<Response // there are that many failures someone would take notice and hopefully stop the client. logger.error("Could not process the response", cause); - // the channel took an error because of something pretty bad so release all the futures out there - pending.values().forEach(val -> val.markError(cause)); - pending.clear(); + pending.getAndSet(null).markError(cause); // serialization exceptions should not close the channel - that's worth a retry if (!IteratorUtils.anyMatch(ExceptionUtils.getThrowableList(cause).iterator(), t -> t instanceof SerializationException)) diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java index 5e0467a368..4ff552ca65 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpGremlinRequestEncoder.java @@ -28,9 +28,6 @@ import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; -import io.netty.util.Attribute; -import io.netty.util.AttributeKey; -import io.netty.util.AttributeMap; import org.apache.tinkerpop.gremlin.driver.UserAgent; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; import org.apache.tinkerpop.gremlin.process.traversal.Bytecode; @@ -40,7 +37,6 @@ import org.apache.tinkerpop.gremlin.util.ser.MessageTextSerializerV4; import org.apache.tinkerpop.gremlin.util.ser.SerTokens; import java.util.List; -import java.util.UUID; import java.util.function.UnaryOperator; /** @@ -49,8 +45,6 @@ import java.util.function.UnaryOperator; @ChannelHandler.Sharable public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<RequestMessageV4> { - //todo: move - public static final AttributeKey<UUID> REQUEST_ID = AttributeKey.valueOf("requestId"); private final MessageSerializer<?> serializer; private final boolean userAgentEnabled; private final UnaryOperator<FullHttpRequest> interceptor; @@ -70,10 +64,6 @@ public final class HttpGremlinRequestEncoder extends MessageToMessageEncoder<Req @Override protected void encode(final ChannelHandlerContext channelHandlerContext, final RequestMessageV4 requestMessage, final List<Object> objects) throws Exception { - final Attribute<UUID> requestIdAttribute = ((AttributeMap) channelHandlerContext).attr(REQUEST_ID); - requestIdAttribute.set(requestMessage.getRequestId()); - System.out.println("HttpGremlinRequestEncoder set requestId: " + requestIdAttribute.get()); - final String mimeType = serializer.mimeTypesSupported()[0]; // only GraphSON3 and GraphBinary recommended for serialization of Bytecode requests if (requestMessage.getField("gremlin") instanceof Bytecode && diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java index 99d4205005..7d249c8bc6 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java @@ -35,13 +35,12 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.util.Iterator; import java.util.Map; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_BATCH_SIZE; import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_EVAL_TIMEOUT; -import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_USER_AGENT; import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_MATERIALIZE_PROPERTIES; +import static org.apache.tinkerpop.gremlin.util.Tokens.ARGS_USER_AGENT; /** @@ -80,7 +79,7 @@ public class DriverRemoteConnection implements RemoteConnection { cluster = conf.containsKey(GREMLIN_REMOTE_DRIVER_CLUSTERFILE) ? Cluster.open(conf.getString(GREMLIN_REMOTE_DRIVER_CLUSTERFILE)) : Cluster.open(conf.subset("clusterConfiguration")); - client = cluster.connect(Client.Settings.build().create()).alias(remoteTraversalSourceName); + client = cluster.connect().alias(remoteTraversalSourceName); } catch (Exception ex) { throw new IllegalStateException(ex); } @@ -93,7 +92,7 @@ public class DriverRemoteConnection implements RemoteConnection { } private DriverRemoteConnection(final Cluster cluster, final boolean tryCloseCluster, final String remoteTraversalSourceName) { - client = cluster.connect(Client.Settings.build().create()).alias(remoteTraversalSourceName); + client = cluster.connect().alias(remoteTraversalSourceName); this.remoteTraversalSourceName = remoteTraversalSourceName; this.tryCloseCluster = tryCloseCluster; attachElements = false; @@ -108,7 +107,7 @@ public class DriverRemoteConnection implements RemoteConnection { attachElements = conf.containsKey(GREMLIN_REMOTE + "attachment"); - client = cluster.connect(Client.Settings.build().create()).alias(remoteTraversalSourceName); + client = cluster.connect().alias(remoteTraversalSourceName); tryCloseCluster = false; tryCloseClient = true; this.conf = Optional.of(conf); @@ -237,10 +236,7 @@ public class DriverRemoteConnection implements RemoteConnection { * If the connection is bound to a session, then get the session identifier from it. */ Optional<String> getSessionId() { - if (client instanceof Client.SessionedClient) { - Client.SessionedClient c = (Client.SessionedClient) client; - return Optional.of(c.getSessionId()); - } + // todo: not implemented return Optional.empty(); } @@ -281,8 +277,9 @@ public class DriverRemoteConnection implements RemoteConnection { */ @Override public Transaction tx() { + // todo: not implemented final DriverRemoteConnection session = new DriverRemoteConnection( - client.getCluster().connect(UUID.randomUUID().toString()), remoteTraversalSourceName, true); + client.getCluster().connect(), remoteTraversalSourceName, true); return new DriverRemoteTransaction(session); } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java new file mode 100644 index 0000000000..01aa1f695e --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ConnectionPoolTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.driver; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ConnectionPoolTest { + + @Test + public void shouldCreateAndScalePool() throws TimeoutException { + final AtomicInteger connectionsCreated = new AtomicInteger(0); + + final Connection mockConn0 = mock(Connection.class); + when(mockConn0.isBorrowed()).thenReturn(new AtomicBoolean(false)); + final Connection mockConn1 = mock(Connection.class); + when(mockConn1.isBorrowed()).thenReturn(new AtomicBoolean(false)); + final List<Connection> mockConns = Arrays.asList(mockConn0, mockConn1); + + final ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + when(connectionFactory.create(any(ConnectionPool.class))) + .thenAnswer(i -> mockConns.get(connectionsCreated.getAndIncrement())); + + final Cluster cluster = mock(Cluster.class); + when(cluster.connectionPoolSettings()).thenReturn(new Settings.ConnectionPoolSettings()); + final ScheduledThreadPoolExecutor connectionScheduler = new ScheduledThreadPoolExecutor(2, + new BasicThreadFactory.Builder().namingPattern("gremlin-driver-conn-scheduler-%d").build()); + when(cluster.connectionScheduler()).thenReturn(connectionScheduler); + + final Host host = mock(Host.class); + + final Client client = new Client.ClusteredClient(cluster); + + // create pool with 1 connection + final ConnectionPool connectionPool = new ConnectionPool(host, client, + Optional.of(1), Optional.of(2), connectionFactory); + // try to borrow this connection. + final Connection conn0 = connectionPool.borrowConnection(100, TimeUnit.MILLISECONDS); + + assertNotNull(connectionPool); + assertNotNull(conn0); + assertEquals(1, connectionsCreated.get()); + + // try to borrow connection. conn0 is mocked as borrowed, so should create new one + when(mockConn0.isBorrowed()).thenReturn(new AtomicBoolean(true)); + final Connection conn1 = connectionPool.borrowConnection(100, TimeUnit.MILLISECONDS); + + assertNotNull(conn1); + assertEquals(2, connectionsCreated.get()); + + // mark conn1 as borrowed and try to get one more connection + when(mockConn1.isBorrowed()).thenReturn(new AtomicBoolean(true)); + try { + connectionPool.borrowConnection(1000, TimeUnit.MILLISECONDS); + fail("Pool already at fool capacity, connection can't be added"); + } catch (TimeoutException te) { + assertEquals(2, connectionsCreated.get()); + } + + // return conn0 to pool, can be borrowed again + when(mockConn0.isBorrowed()).thenReturn(new AtomicBoolean(false)); + final Connection conn00 = connectionPool.borrowConnection(100, TimeUnit.MILLISECONDS); + + assertNotNull(conn00); + assertEquals(2, connectionsCreated.get()); + } +} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/HostTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/HostTest.java index 2c20261335..066a77568d 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/HostTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/HostTest.java @@ -35,7 +35,7 @@ public class HostTest { final InetSocketAddress addy = new InetSocketAddress("localhost", 8182); final Host host = new Host(addy, Cluster.open()); final URI webSocketUri = host.getHostUri(); - assertEquals("ws://localhost:8182/gremlin", webSocketUri.toString()); + assertEquals("http://localhost:8182/gremlin", webSocketUri.toString()); } @Test @@ -43,7 +43,7 @@ public class HostTest { final InetSocketAddress addy = new InetSocketAddress("localhost", 8183); final Host host = new Host(addy, Cluster.build().port(8183).path("/argh").create()); final URI webSocketUri = host.getHostUri(); - assertEquals("ws://localhost:8183/argh", webSocketUri.toString()); + assertEquals("http://localhost:8183/argh", webSocketUri.toString()); } } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java index f70e60c5e4..cd0ce0b97d 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/WebSocketClientBehaviorIntegrateTest.java @@ -56,481 +56,481 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -public class WebSocketClientBehaviorIntegrateTest { - private static final Logger logger = LoggerFactory.getLogger(WebSocketClientBehaviorIntegrateTest.class); - - @Rule - public TestName name = new TestName(); - - private static LogCaptor logCaptor; - - private final SocketServerSettings settings; - - private SimpleSocketServer server; - - public WebSocketClientBehaviorIntegrateTest() throws IOException { - settings = SocketServerSettings.read(FileSystems.getDefault().getPath("..","gremlin-tools", "gremlin-socket-server", "conf", "test-ws-gremlin.yaml")); - settings.SERIALIZER = "GraphSONV2"; - } - - @BeforeClass - public static void setupLogCaptor() { - logCaptor = LogCaptor.forRoot(); - } - - @AfterClass - public static void tearDown() { - logCaptor.close(); - } - - @Before - public void setUp() throws InterruptedException { - logCaptor.clearLogs(); - - server = new SimpleSocketServer(settings); - if (name.getMethodName().equals("shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout") || - name.getMethodName().equals("shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout")) { - server.start(new TestChannelizers.TestWSNoOpInitializer()); - } else if (name.getMethodName().equals("shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections") || - name.getMethodName().equals("shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded")) { - server.start(new TestChannelizers.TestConnectionThrottlingInitializer(settings)); - } else { - server.start(new TestWSGremlinInitializer(settings)); - } - } - - @After - public void shutdown() { - server.stop(); - } - - /** - * Tests that client is correctly sending user agent during web socket handshake by having the server return - * the captured user agent. - */ - @Test - public void shouldIncludeUserAgentInHandshakeRequest() { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(1) - .serializer(Serializers.GRAPHSON_V2) - .create(); - final Client.ClusteredClient client = cluster.connect(); - - // trigger the testing server to return captured user agent - String returnedUserAgent = client.submit("1", RequestOptions.build() - .overrideRequestId(settings.USER_AGENT_REQUEST_ID).create()).one().getString(); - assertEquals(UserAgent.USER_AGENT, returnedUserAgent); - } - - /** - * Tests that no user agent is sent to server when that behaviour is disabled. - */ - @Test - public void shouldNotIncludeUserAgentInHandshakeRequestIfDisabled() { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(1) - .serializer(Serializers.GRAPHSON_V2) - .enableUserAgentOnConnect(false) - .create(); - final Client.ClusteredClient client = cluster.connect(); - - // trigger the testing server to return captured user agent - String returnedUserAgent = client.submit("1", RequestOptions.build() - .overrideRequestId(settings.USER_AGENT_REQUEST_ID).create()).one().getString(); - assertEquals("", returnedUserAgent); - } - - /** - * Constructs a deadlock situation when initializing a {@link Client} object in sessionless form that leads to - * hanging behavior in low resource environments (TINKERPOP-2504) and for certain configurations of the - * {@link Cluster} object where there are simply not enough threads to properly allow the {@link Host} and its - * related {@link ConnectionPool} objects to spin up properly - see TINKERPOP-2550. - */ - @Test - public void shouldNotDeadlockOnInitialization() throws Exception { - // it seems you can add the same host more than once so while kinda weird it is helpful in faithfully - // recreating the deadlock situation, though it can/will happen with just one host. workerPoolSize at - // "1" also helps faithfully reproduce the problem though it can happen at larger pool sizes depending - // on the timing/interleaving of tasks. the larger connection pool sizes may not be required given the - // other settings at play but again, just trying to make sure the deadlock state is consistently produced - // and a larger pool size will mean more time to elapse scheduling connection creation tasks which may - // further improve chances of scheduling conflicts that produce the deadlock. - // - // to force this test to a fail state, change ClusteredClient.initializeImplementation() to use the - // standard Cluster.executor rather than the hostExecutor (which is a single threaded independent thread - // pool used just for the purpose of initializing the hosts). - final Cluster cluster = Cluster.build("localhost"). - addContactPoint("localhost"). - addContactPoint("localhost").port(settings.PORT). - workerPoolSize(1). - minConnectionPoolSize(32).maxConnectionPoolSize(32).create(); - - final AtomicBoolean failed = new AtomicBoolean(false); - final ExecutorService executor = Executors.newSingleThreadExecutor(); - executor.submit(() -> { - try { - final Client client = cluster.connect(); - - // test will hang in init() where the Host and ConnectionPool are started up - client.init(); - } catch (Exception ex) { - // should not "fail" - just hang and then timeout during the executor shutdown as there is - // a deadlock state, but we have this here just in case. a failed assertion of this value - // below could be interesting - logger.error("Client initialization failed with exception which was unexpected", ex); - failed.set(true); - } finally { - cluster.close(); - } - }); - - executor.shutdown(); - - // 30 seconds should be ample time, even for travis. the deadlock state happens quite immediately in - // testing and in most situations this test should zip by in subsecond pace - assertThat(executor.awaitTermination(30, TimeUnit.SECONDS), is(true)); - assertThat(failed.get(), is(false)); - } - - /** - * Test a scenario when server closes a connection which does not have any active requests. Such connection - * should be destroyed and replaced by another connection on next request. - */ - @Test - public void shouldRemoveConnectionFromPoolWhenServerClose_WithNoPendingRequests() throws InterruptedException { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(1) - .serializer(Serializers.GRAPHSON_V2) - .create(); - final Client.ClusteredClient client = cluster.connect(); - - // Initialize the client preemptively - client.init(); - - // assert number of connections opened - final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get(); - assertEquals(1, channelPool.getConnectionIDs().size()); - - final String originalConnectionID = channelPool.getConnectionIDs().iterator().next(); - logger.info("On client init ConnectionIDs: " + channelPool.getConnectionIDs()); - - // trigger the testing server to send a WS close frame - Vertex v = client.submit("1", RequestOptions.build() - .overrideRequestId(settings.SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID).create()) - .one().getVertex(); - - assertNotNull(v); - - // assert connection is not closed yet - assertEquals(1, channelPool.getConnectionIDs().size()); - - // wait for server to send the close WS frame - Thread.sleep(6000); - - // assert that original connection is not part of the connection pool any more - assertThat("The original connection should have been closed by the server.", - channelPool.getConnectionIDs().contains(originalConnectionID), is(false)); - - // assert sanity after connection replacement - v = client.submit("1", - RequestOptions.build().overrideRequestId(settings.SINGLE_VERTEX_REQUEST_ID).create()) - .one().getVertex(); - assertNotNull(v); - } - - /** - * Tests a scenario when the connection a faulty connection replaced by a new connection. - * Ensures that the creation of a new replacement channel only happens once. - */ - @Test - public void shouldRemoveConnectionFromPoolWhenServerClose_WithPendingRequests() throws InterruptedException, ExecutionException { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(1) - .serializer(Serializers.GRAPHSON_V2) - .create(); - - final Client.ClusteredClient client = cluster.connect(); - - // Initialize the client preemptively - client.init(); - - // assert number of connections opened - final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get(); - assertEquals(1, channelPool.getConnectionIDs().size()); - - // Send two requests in flight. Both should error out. - final CompletableFuture<ResultSet> req1 = client.submitAsync("1", RequestOptions.build() - .overrideRequestId(settings.CLOSE_CONNECTION_REQUEST_ID).create()); - final CompletableFuture<ResultSet> req2 = client.submitAsync("1", RequestOptions.build() - .overrideRequestId(settings.CLOSE_CONNECTION_REQUEST_ID_2).create()); - - - // assert both are sent on same connection - assertEquals(1, channelPool.getConnectionIDs().size()); - - // trigger write for both requests - req1.get(); - req2.get(); - - // wait for close message to arrive from server - Thread.sleep(2000); - - // Assert that we should consider creating a connection only once, since only one connection is being closed. - assertEquals(1, logCaptor.getLogs().stream().filter(str -> str.contains("Considering new connection on")).count()); - - // assert sanity after connection replacement - final Vertex v = client.submit("1", - RequestOptions.build().overrideRequestId(settings.SINGLE_VERTEX_REQUEST_ID).create()) - .one().getVertex(); - assertNotNull(v); - } - - /** - * Tests the scenario when client intentionally closes the connection. In this case, the - * connection should not be recycled. - */ - @Test - public void shouldNotCreateReplacementConnectionWhenClientClosesConnection() throws ExecutionException, InterruptedException { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(1) - .serializer(Serializers.GRAPHSON_V2) - .create(); - final Client.ClusteredClient client = cluster.connect(); - - // Initialize the client preemptively - client.init(); - - // Clearing logCaptor before attempting to close the connection is in response to an issue where this test can - // be polluted by logs from a previous test when running on slow hardware. - logCaptor.clearLogs(); - - // assert number of connections opened - final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get(); - assertEquals(1, channelPool.getConnectionIDs().size()); - - // close the connection pool in an authentic manner - channelPool.closeAsync().get(); - - // wait for channel closure callback to trigger - Thread.sleep(2000); - - assertEquals("OnClose callback should be called but only once", 1, - logCaptor.getLogs().stream().filter(str -> str.contains("OnChannelClose callback called for channel")).count()); - - assertEquals("No new connection creation should be started", 0, - logCaptor.getLogs().stream().filter(str -> str.contains("Considering new connection on")).count()); - } - - /** - * (TINKERPOP-2814) Tests to make sure that the SSL handshake is now capped by connectionSetupTimeoutMillis and not - * the default Netty SSL handshake timeout of 10,000ms. - */ - @Test - public void shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout() { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(1) - .connectionSetupTimeoutMillis(20000) // needs to be larger than 10000ms. - .enableSsl(true) - .create(); - - final Client.ClusteredClient client = cluster.connect(); - final long start = System.currentTimeMillis(); - - Exception caught = null; - try { - client.submit("1"); - } catch (Exception e) { - caught = e; - } finally { - // Test against 15000ms which should give a big enough buffer to avoid timing issues. - assertTrue(System.currentTimeMillis() - start > 15000); - assertTrue(caught != null); - assertTrue(caught instanceof NoHostAvailableException); - assertTrue(logCaptor.getLogs().stream().anyMatch(str -> str.contains("SSL handshake not completed"))); - } - - cluster.close(); - } - - /** - * Tests to make sure that the correct error message is logged when a non-SSL connection attempt times out. - */ - @Test - public void shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout() throws InterruptedException { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(1) - .connectionSetupTimeoutMillis(120) - .create(); - - final Client.ClusteredClient client = cluster.connect(); - - Exception caught = null; - try { - client.submit("1"); - } catch (Exception e) { - caught = e; - } finally { - assertTrue(caught != null); - assertTrue(caught instanceof NoHostAvailableException); - Thread.sleep(150); - assertTrue(logCaptor.getLogs().stream().anyMatch(str -> str.contains("WebSocket handshake not completed"))); - } - - cluster.close(); - } - - /** - * Tests that if a server throttles new connections (doesn't allow new connections to be made) then all requests - * will run and complete on the connections that are already open. - */ - @Test - public void shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections() throws ExecutionException, InterruptedException, TimeoutException { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(5) - .maxWaitForConnection(15000) // large value ensures that request will eventually find a connection. - .connectionSetupTimeoutMillis(1000) - .serializer(Serializers.GRAPHSON_V2) - .create(); - - final Client.ClusteredClient client = cluster.connect(); - - final List<CompletableFuture<ResultSet>> results = new ArrayList<CompletableFuture<ResultSet>>(); - for (int i = 0; i < 5; i++) { - results.add(client.submitAsync("500")); - } - - for (CompletableFuture<ResultSet> result : results) { - assertNotNull(result.get(60000, TimeUnit.MILLISECONDS).one().getVertex()); - } - - cluster.close(); - } - - /** - * Tests that if a server throttles new connections (doesn't allow new connections to be made) then any request - * that can't find a connection within its maxWaitForConnection will return an informative exception regarding - * the inability to open new connections. - */ - @Test - public void shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded() { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(5) - .maxWaitForConnection(250) // small value ensures that requests will return TimeoutException. - .connectionSetupTimeoutMillis(100) - .serializer(Serializers.GRAPHSON_V2) - .create(); - - final Client.ClusteredClient client = cluster.connect(); - - for (int i = 0; i < 5; i++) { - try { - client.submitAsync("3000"); - } catch (Exception e) { - final Throwable rootCause = ExceptionHelper.getRootCause(e); - assertTrue(rootCause instanceof TimeoutException); - assertTrue(rootCause.getMessage().contains("WebSocket handshake not completed")); - } - } - - cluster.close(); - } - - /** - * Tests that the client continues to work if the server temporarily goes down between two requests. - */ - @Test - public void shouldContinueRunningIfServerGoesDownTemporarily() throws InterruptedException { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .serializer(Serializers.GRAPHSON_V2) - .create(); - - final Client.ClusteredClient client = cluster.connect(); - final Object lock = new Object(); - - final ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1); - scheduledPool.schedule(() -> { - try { - server.stopSync(); - server = new SimpleSocketServer(settings); - server.start(new TestWSGremlinInitializer(settings)); - synchronized (lock) { - lock.notify(); - } - } catch (InterruptedException ignored) { - // Ignored. - } - }, 1000, TimeUnit.MILLISECONDS); - - synchronized (lock) { - assertNotNull(client.submit("1").one().getVertex()); - lock.wait(30000); - } - - assertNotNull(client.submit("1").one().getVertex()); - - cluster.close(); - } - - /** - * Tests that if the host is unavailable then the client will return an exception that contains information about - * the status of the host. - */ - @Test - public void shouldReturnCorrectExceptionIfServerGoesDown() throws InterruptedException { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxWaitForConnection(500) - .connectionSetupTimeoutMillis(100) - .serializer(Serializers.GRAPHSON_V2) - .create(); - - final Client.ClusteredClient client = cluster.connect(); - client.submit("1"); - - server.stopSync(); - - try { - client.submit("1"); - } catch (Exception e) { - final Throwable rootCause = ExceptionHelper.getRootCause(e); - assertTrue(rootCause instanceof TimeoutException); - assertTrue(rootCause.getMessage().contains("Connection refused")); - } - - cluster.close(); - } - - /** - * Tests that client is correctly sending all overridable per request settings (requestId, batchSize, - * evaluationTimeout, and userAgent) to the server. - */ - @Test - public void shouldSendPerRequestSettingsToServer() { - final Cluster cluster = Cluster.build("localhost").port(settings.PORT) - .minConnectionPoolSize(1) - .maxConnectionPoolSize(1) - .serializer(Serializers.GRAPHSON_V2) - .create(); - final Client.ClusteredClient client = cluster.connect(); - - // trigger the testing server to return captured request settings - String response = client.submit("1", RequestOptions.build() - .overrideRequestId(settings.PER_REQUEST_SETTINGS_REQUEST_ID) - .timeout(1234).userAgent("helloWorld").batchSize(12) - .materializeProperties("tokens").create()).one().getString(); - - String expectedResponse = String.format("requestId=%s evaluationTimeout=%d, batchSize=%d, userAgent=%s, materializeProperties=%s", - settings.PER_REQUEST_SETTINGS_REQUEST_ID, 1234, 12, "helloWorld", "tokens"); - assertEquals(expectedResponse, response); - } -} +//public class WebSocketClientBehaviorIntegrateTest { +// private static final Logger logger = LoggerFactory.getLogger(WebSocketClientBehaviorIntegrateTest.class); +// +// @Rule +// public TestName name = new TestName(); +// +// private static LogCaptor logCaptor; +// +// private final SocketServerSettings settings; +// +// private SimpleSocketServer server; +// +// public WebSocketClientBehaviorIntegrateTest() throws IOException { +// settings = SocketServerSettings.read(FileSystems.getDefault().getPath("..","gremlin-tools", "gremlin-socket-server", "conf", "test-ws-gremlin.yaml")); +// settings.SERIALIZER = "GraphSONV2"; +// } +// +// @BeforeClass +// public static void setupLogCaptor() { +// logCaptor = LogCaptor.forRoot(); +// } +// +// @AfterClass +// public static void tearDown() { +// logCaptor.close(); +// } +// +// @Before +// public void setUp() throws InterruptedException { +// logCaptor.clearLogs(); +// +// server = new SimpleSocketServer(settings); +// if (name.getMethodName().equals("shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout") || +// name.getMethodName().equals("shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout")) { +// server.start(new TestChannelizers.TestWSNoOpInitializer()); +// } else if (name.getMethodName().equals("shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections") || +// name.getMethodName().equals("shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded")) { +// server.start(new TestChannelizers.TestConnectionThrottlingInitializer(settings)); +// } else { +// server.start(new TestWSGremlinInitializer(settings)); +// } +// } +// +// @After +// public void shutdown() { +// server.stop(); +// } +// +// /** +// * Tests that client is correctly sending user agent during web socket handshake by having the server return +// * the captured user agent. +// */ +// @Test +// public void shouldIncludeUserAgentInHandshakeRequest() { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(1) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// final Client.ClusteredClient client = cluster.connect(); +// +// // trigger the testing server to return captured user agent +// String returnedUserAgent = client.submit("1", RequestOptions.build() +// .overrideRequestId(settings.USER_AGENT_REQUEST_ID).create()).one().getString(); +// assertEquals(UserAgent.USER_AGENT, returnedUserAgent); +// } +// +// /** +// * Tests that no user agent is sent to server when that behaviour is disabled. +// */ +// @Test +// public void shouldNotIncludeUserAgentInHandshakeRequestIfDisabled() { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(1) +// .serializer(Serializers.GRAPHSON_V2) +// .enableUserAgentOnConnect(false) +// .create(); +// final Client.ClusteredClient client = cluster.connect(); +// +// // trigger the testing server to return captured user agent +// String returnedUserAgent = client.submit("1", RequestOptions.build() +// .overrideRequestId(settings.USER_AGENT_REQUEST_ID).create()).one().getString(); +// assertEquals("", returnedUserAgent); +// } +// +// /** +// * Constructs a deadlock situation when initializing a {@link Client} object in sessionless form that leads to +// * hanging behavior in low resource environments (TINKERPOP-2504) and for certain configurations of the +// * {@link Cluster} object where there are simply not enough threads to properly allow the {@link Host} and its +// * related {@link ConnectionPool} objects to spin up properly - see TINKERPOP-2550. +// */ +// @Test +// public void shouldNotDeadlockOnInitialization() throws Exception { +// // it seems you can add the same host more than once so while kinda weird it is helpful in faithfully +// // recreating the deadlock situation, though it can/will happen with just one host. workerPoolSize at +// // "1" also helps faithfully reproduce the problem though it can happen at larger pool sizes depending +// // on the timing/interleaving of tasks. the larger connection pool sizes may not be required given the +// // other settings at play but again, just trying to make sure the deadlock state is consistently produced +// // and a larger pool size will mean more time to elapse scheduling connection creation tasks which may +// // further improve chances of scheduling conflicts that produce the deadlock. +// // +// // to force this test to a fail state, change ClusteredClient.initializeImplementation() to use the +// // standard Cluster.executor rather than the hostExecutor (which is a single threaded independent thread +// // pool used just for the purpose of initializing the hosts). +// final Cluster cluster = Cluster.build("localhost"). +// addContactPoint("localhost"). +// addContactPoint("localhost").port(settings.PORT). +// workerPoolSize(1). +// minConnectionPoolSize(32).maxConnectionPoolSize(32).create(); +// +// final AtomicBoolean failed = new AtomicBoolean(false); +// final ExecutorService executor = Executors.newSingleThreadExecutor(); +// executor.submit(() -> { +// try { +// final Client client = cluster.connect(); +// +// // test will hang in init() where the Host and ConnectionPool are started up +// client.init(); +// } catch (Exception ex) { +// // should not "fail" - just hang and then timeout during the executor shutdown as there is +// // a deadlock state, but we have this here just in case. a failed assertion of this value +// // below could be interesting +// logger.error("Client initialization failed with exception which was unexpected", ex); +// failed.set(true); +// } finally { +// cluster.close(); +// } +// }); +// +// executor.shutdown(); +// +// // 30 seconds should be ample time, even for travis. the deadlock state happens quite immediately in +// // testing and in most situations this test should zip by in subsecond pace +// assertThat(executor.awaitTermination(30, TimeUnit.SECONDS), is(true)); +// assertThat(failed.get(), is(false)); +// } +// +// /** +// * Test a scenario when server closes a connection which does not have any active requests. Such connection +// * should be destroyed and replaced by another connection on next request. +// */ +// @Test +// public void shouldRemoveConnectionFromPoolWhenServerClose_WithNoPendingRequests() throws InterruptedException { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(1) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// final Client.ClusteredClient client = cluster.connect(); +// +// // Initialize the client preemptively +// client.init(); +// +// // assert number of connections opened +// final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get(); +// assertEquals(1, channelPool.getConnectionIDs().size()); +// +// final String originalConnectionID = channelPool.getConnectionIDs().iterator().next(); +// logger.info("On client init ConnectionIDs: " + channelPool.getConnectionIDs()); +// +// // trigger the testing server to send a WS close frame +// Vertex v = client.submit("1", RequestOptions.build() +// .overrideRequestId(settings.SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID).create()) +// .one().getVertex(); +// +// assertNotNull(v); +// +// // assert connection is not closed yet +// assertEquals(1, channelPool.getConnectionIDs().size()); +// +// // wait for server to send the close WS frame +// Thread.sleep(6000); +// +// // assert that original connection is not part of the connection pool any more +// assertThat("The original connection should have been closed by the server.", +// channelPool.getConnectionIDs().contains(originalConnectionID), is(false)); +// +// // assert sanity after connection replacement +// v = client.submit("1", +// RequestOptions.build().overrideRequestId(settings.SINGLE_VERTEX_REQUEST_ID).create()) +// .one().getVertex(); +// assertNotNull(v); +// } +// +// /** +// * Tests a scenario when the connection a faulty connection replaced by a new connection. +// * Ensures that the creation of a new replacement channel only happens once. +// */ +// @Test +// public void shouldRemoveConnectionFromPoolWhenServerClose_WithPendingRequests() throws InterruptedException, ExecutionException { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(1) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// +// final Client.ClusteredClient client = cluster.connect(); +// +// // Initialize the client preemptively +// client.init(); +// +// // assert number of connections opened +// final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get(); +// assertEquals(1, channelPool.getConnectionIDs().size()); +// +// // Send two requests in flight. Both should error out. +// final CompletableFuture<ResultSet> req1 = client.submitAsync("1", RequestOptions.build() +// .overrideRequestId(settings.CLOSE_CONNECTION_REQUEST_ID).create()); +// final CompletableFuture<ResultSet> req2 = client.submitAsync("1", RequestOptions.build() +// .overrideRequestId(settings.CLOSE_CONNECTION_REQUEST_ID_2).create()); +// +// +// // assert both are sent on same connection +// assertEquals(1, channelPool.getConnectionIDs().size()); +// +// // trigger write for both requests +// req1.get(); +// req2.get(); +// +// // wait for close message to arrive from server +// Thread.sleep(2000); +// +// // Assert that we should consider creating a connection only once, since only one connection is being closed. +// assertEquals(1, logCaptor.getLogs().stream().filter(str -> str.contains("Considering new connection on")).count()); +// +// // assert sanity after connection replacement +// final Vertex v = client.submit("1", +// RequestOptions.build().overrideRequestId(settings.SINGLE_VERTEX_REQUEST_ID).create()) +// .one().getVertex(); +// assertNotNull(v); +// } +// +// /** +// * Tests the scenario when client intentionally closes the connection. In this case, the +// * connection should not be recycled. +// */ +// @Test +// public void shouldNotCreateReplacementConnectionWhenClientClosesConnection() throws ExecutionException, InterruptedException { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(1) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// final Client.ClusteredClient client = cluster.connect(); +// +// // Initialize the client preemptively +// client.init(); +// +// // Clearing logCaptor before attempting to close the connection is in response to an issue where this test can +// // be polluted by logs from a previous test when running on slow hardware. +// logCaptor.clearLogs(); +// +// // assert number of connections opened +// final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get(); +// assertEquals(1, channelPool.getConnectionIDs().size()); +// +// // close the connection pool in an authentic manner +// channelPool.closeAsync().get(); +// +// // wait for channel closure callback to trigger +// Thread.sleep(2000); +// +// assertEquals("OnClose callback should be called but only once", 1, +// logCaptor.getLogs().stream().filter(str -> str.contains("OnChannelClose callback called for channel")).count()); +// +// assertEquals("No new connection creation should be started", 0, +// logCaptor.getLogs().stream().filter(str -> str.contains("Considering new connection on")).count()); +// } +// +// /** +// * (TINKERPOP-2814) Tests to make sure that the SSL handshake is now capped by connectionSetupTimeoutMillis and not +// * the default Netty SSL handshake timeout of 10,000ms. +// */ +// @Test +// public void shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout() { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(1) +// .connectionSetupTimeoutMillis(20000) // needs to be larger than 10000ms. +// .enableSsl(true) +// .create(); +// +// final Client.ClusteredClient client = cluster.connect(); +// final long start = System.currentTimeMillis(); +// +// Exception caught = null; +// try { +// client.submit("1"); +// } catch (Exception e) { +// caught = e; +// } finally { +// // Test against 15000ms which should give a big enough buffer to avoid timing issues. +// assertTrue(System.currentTimeMillis() - start > 15000); +// assertTrue(caught != null); +// assertTrue(caught instanceof NoHostAvailableException); +// assertTrue(logCaptor.getLogs().stream().anyMatch(str -> str.contains("SSL handshake not completed"))); +// } +// +// cluster.close(); +// } +// +// /** +// * Tests to make sure that the correct error message is logged when a non-SSL connection attempt times out. +// */ +// @Test +// public void shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout() throws InterruptedException { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(1) +// .connectionSetupTimeoutMillis(120) +// .create(); +// +// final Client.ClusteredClient client = cluster.connect(); +// +// Exception caught = null; +// try { +// client.submit("1"); +// } catch (Exception e) { +// caught = e; +// } finally { +// assertTrue(caught != null); +// assertTrue(caught instanceof NoHostAvailableException); +// Thread.sleep(150); +// assertTrue(logCaptor.getLogs().stream().anyMatch(str -> str.contains("WebSocket handshake not completed"))); +// } +// +// cluster.close(); +// } +// +// /** +// * Tests that if a server throttles new connections (doesn't allow new connections to be made) then all requests +// * will run and complete on the connections that are already open. +// */ +// @Test +// public void shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections() throws ExecutionException, InterruptedException, TimeoutException { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(5) +// .maxWaitForConnection(15000) // large value ensures that request will eventually find a connection. +// .connectionSetupTimeoutMillis(1000) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// +// final Client.ClusteredClient client = cluster.connect(); +// +// final List<CompletableFuture<ResultSet>> results = new ArrayList<CompletableFuture<ResultSet>>(); +// for (int i = 0; i < 5; i++) { +// results.add(client.submitAsync("500")); +// } +// +// for (CompletableFuture<ResultSet> result : results) { +// assertNotNull(result.get(60000, TimeUnit.MILLISECONDS).one().getVertex()); +// } +// +// cluster.close(); +// } +// +// /** +// * Tests that if a server throttles new connections (doesn't allow new connections to be made) then any request +// * that can't find a connection within its maxWaitForConnection will return an informative exception regarding +// * the inability to open new connections. +// */ +// @Test +// public void shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded() { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(5) +// .maxWaitForConnection(250) // small value ensures that requests will return TimeoutException. +// .connectionSetupTimeoutMillis(100) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// +// final Client.ClusteredClient client = cluster.connect(); +// +// for (int i = 0; i < 5; i++) { +// try { +// client.submitAsync("3000"); +// } catch (Exception e) { +// final Throwable rootCause = ExceptionHelper.getRootCause(e); +// assertTrue(rootCause instanceof TimeoutException); +// assertTrue(rootCause.getMessage().contains("WebSocket handshake not completed")); +// } +// } +// +// cluster.close(); +// } +// +// /** +// * Tests that the client continues to work if the server temporarily goes down between two requests. +// */ +// @Test +// public void shouldContinueRunningIfServerGoesDownTemporarily() throws InterruptedException { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// +// final Client.ClusteredClient client = cluster.connect(); +// final Object lock = new Object(); +// +// final ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1); +// scheduledPool.schedule(() -> { +// try { +// server.stopSync(); +// server = new SimpleSocketServer(settings); +// server.start(new TestWSGremlinInitializer(settings)); +// synchronized (lock) { +// lock.notify(); +// } +// } catch (InterruptedException ignored) { +// // Ignored. +// } +// }, 1000, TimeUnit.MILLISECONDS); +// +// synchronized (lock) { +// assertNotNull(client.submit("1").one().getVertex()); +// lock.wait(30000); +// } +// +// assertNotNull(client.submit("1").one().getVertex()); +// +// cluster.close(); +// } +// +// /** +// * Tests that if the host is unavailable then the client will return an exception that contains information about +// * the status of the host. +// */ +// @Test +// public void shouldReturnCorrectExceptionIfServerGoesDown() throws InterruptedException { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxWaitForConnection(500) +// .connectionSetupTimeoutMillis(100) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// +// final Client.ClusteredClient client = cluster.connect(); +// client.submit("1"); +// +// server.stopSync(); +// +// try { +// client.submit("1"); +// } catch (Exception e) { +// final Throwable rootCause = ExceptionHelper.getRootCause(e); +// assertTrue(rootCause instanceof TimeoutException); +// assertTrue(rootCause.getMessage().contains("Connection refused")); +// } +// +// cluster.close(); +// } +// +// /** +// * Tests that client is correctly sending all overridable per request settings (requestId, batchSize, +// * evaluationTimeout, and userAgent) to the server. +// */ +// @Test +// public void shouldSendPerRequestSettingsToServer() { +// final Cluster cluster = Cluster.build("localhost").port(settings.PORT) +// .minConnectionPoolSize(1) +// .maxConnectionPoolSize(1) +// .serializer(Serializers.GRAPHSON_V2) +// .create(); +// final Client.ClusteredClient client = cluster.connect(); +// +// // trigger the testing server to return captured request settings +// String response = client.submit("1", RequestOptions.build() +// .overrideRequestId(settings.PER_REQUEST_SETTINGS_REQUEST_ID) +// .timeout(1234).userAgent("helloWorld").batchSize(12) +// .materializeProperties("tokens").create()).one().getString(); +// +// String expectedResponse = String.format("requestId=%s evaluationTimeout=%d, batchSize=%d, userAgent=%s, materializeProperties=%s", +// settings.PER_REQUEST_SETTINGS_REQUEST_ID, 1234, 12, "helloWorld", "tokens"); +// assertEquals(expectedResponse, response); +// } +//} diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnectionTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnectionTest.java index 1b616a9cdd..17fcb0438c 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnectionTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnectionTest.java @@ -36,7 +36,6 @@ public class DriverRemoteConnectionTest { @Test public void shouldBuildRequestOptions() { - final UUID requestId = UUID.fromString("34a9f45f-8854-4d33-8b40-92a8171ee495"); final RequestOptions options = DriverRemoteConnection.getRequestOptions( g.with("x"). with("y", 100). @@ -44,7 +43,6 @@ public class DriverRemoteConnectionTest { with(Tokens.ARGS_EVAL_TIMEOUT, 100000L). with(Tokens.ARGS_USER_AGENT, "test"). V().asAdmin().getBytecode()); - assertEquals(requestId, options.getOverrideRequestId().get()); assertEquals(1000, options.getBatchSize().get().intValue()); assertEquals(100000L, options.getTimeout().get().longValue()); assertEquals("test", options.getUserAgent().get()); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java index f51bd29445..f6aaf77b11 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/driver/ClientConnectionIntegrateTest.java @@ -22,7 +22,6 @@ import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import io.netty.handler.codec.CorruptedFrameException; import nl.altindag.log.LogCaptor; -import org.apache.tinkerpop.gremlin.util.Tokens; import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4; import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java index 9210f00a34..9c4d59b0e1 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverIntegrateTest.java @@ -29,25 +29,18 @@ import org.apache.tinkerpop.gremlin.driver.Cluster; import org.apache.tinkerpop.gremlin.driver.RequestOptions; import org.apache.tinkerpop.gremlin.driver.Result; import org.apache.tinkerpop.gremlin.driver.ResultSet; -import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; import org.apache.tinkerpop.gremlin.driver.handler.GremlinResponseHandler; -import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin; -import org.apache.tinkerpop.gremlin.process.traversal.Bytecode; -import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.server.channel.HttpChannelizer; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.io.Storage; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex; -import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; import org.apache.tinkerpop.gremlin.util.ExceptionHelper; import org.apache.tinkerpop.gremlin.util.TimeUtil; -import org.apache.tinkerpop.gremlin.util.Tokens; import org.apache.tinkerpop.gremlin.util.function.FunctionUtils; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; -import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4; import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1; import org.apache.tinkerpop.gremlin.util.ser.Serializers; import org.junit.After; @@ -55,8 +48,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import org.slf4j.LoggerFactory; import java.awt.*; @@ -72,8 +63,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -82,7 +71,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.AllOf.allOf; @@ -98,7 +86,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.verify; /** * Integration tests for gremlin-driver configurations and settings. @@ -594,49 +581,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration } } - /** - * This test validates that the session requests are processed in-order on the server. The order of results - * returned to the client might be different though since each result is handled by a different executor thread. - */ - @Test - public void shouldProcessSessionRequestsInOrder() throws Exception { - final Cluster cluster = TestClientFactory.open(); - try { - final Client client = cluster.connect(name.getMethodName()); - - final ResultSet first = client.submit("Thread.sleep(5000);g.V().fold().coalesce(unfold(), __.addV('person'))"); - final ResultSet second = client.submit("g.V().count()"); - - final CompletableFuture<List<Result>> futureFirst = first.all(); - final CompletableFuture<List<Result>> futureSecond = second.all(); - - final CountDownLatch latch = new CountDownLatch(2); - final List<Object> results = new ArrayList<>(); - final ExecutorService executor = Executors.newSingleThreadExecutor(); - - futureFirst.thenAcceptAsync(r -> { - results.add(r.get(0).getVertex().label()); - latch.countDown(); - }, executor); - - futureSecond.thenAcceptAsync(r -> { - results.add(r.get(0).getLong()); - latch.countDown(); - }, executor); - - // wait for both results - latch.await(30000, TimeUnit.MILLISECONDS); - - assertThat("Should contain 2 results", results.size() == 2); - assertThat("The numeric result should be 1", results.contains(1L)); - assertThat("The string result contain label person", results.contains("person")); - - executor.shutdown(); - } finally { - cluster.close(); - } - } - @Test public void shouldWaitForAllResultsToArrive() throws Exception { final Cluster cluster = TestClientFactory.open(); @@ -1062,23 +1006,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration } } - @Test - public void shouldExecuteScriptInSession() throws Exception { - final Cluster cluster = TestClientFactory.build().create(); - final Client client = cluster.connect(name.getMethodName()); - - final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]"); - assertEquals(9, results1.all().get().size()); - - final ResultSet results2 = client.submit("x[0]+1"); - assertEquals(2, results2.all().get().get(0).getInt()); - - final ResultSet results3 = client.submit("x[1]+2"); - assertEquals(4, results3.all().get().get(0).getInt()); - - cluster.close(); - } - @Test public void shouldNotThrowNoSuchElementException() throws Exception { final Cluster cluster = TestClientFactory.open(); @@ -1124,127 +1051,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration } } - @Test - public void shouldCloseSession() throws Exception { - final Cluster cluster = TestClientFactory.build().create(); - final Client client = cluster.connect(name.getMethodName()); - - final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]"); - assertEquals(9, results1.all().get().size()); - final ResultSet results2 = client.submit("x[0]+1"); - assertEquals(2, results2.all().get().get(0).getInt()); - - client.close(); - - try { - client.submit("x[0]+1").all().get(); - fail("Should have thrown an exception because the connection is closed"); - } catch (Exception ex) { - final Throwable root = ExceptionHelper.getRootCause(ex); - assertThat(root, instanceOf(IllegalStateException.class)); - } finally { - cluster.close(); - } - } - - @Test - public void shouldExecuteScriptInSessionAssumingDefaultedImports() throws Exception { - final Cluster cluster = TestClientFactory.open(); - final Client client = cluster.connect(name.getMethodName()); - - final ResultSet results1 = client.submit("TinkerFactory.class.name"); - assertEquals(TinkerFactory.class.getName(), results1.all().get().get(0).getString()); - - cluster.close(); - } - - @Test - public void shouldExecuteScriptInSessionOnTransactionalGraph() throws Exception { - - final Cluster cluster = TestClientFactory.open(); - final Client client = cluster.connect(name.getMethodName()); - - final Vertex vertexBeforeTx = client.submit("v=g.addV(\"person\").property(\"name\",\"stephen\").next()").all().get().get(0).getVertex(); - assertEquals("person", vertexBeforeTx.label()); - - final String nameValueFromV = client.submit("g.V().values('name').next()").all().get().get(0).getString(); - assertEquals("stephen", nameValueFromV); - - final Vertex vertexFromBinding = client.submit("v").all().get().get(0).getVertex(); - assertEquals("person", vertexFromBinding.label()); - - final Map<String,Object> vertexAfterTx = client.submit("g.V(v).property(\"color\",\"blue\").iterate(); g.tx().commit(); g.V(v).valueMap().by(unfold())").all().get().get(0).get(Map.class); - assertEquals("stephen", vertexAfterTx.get("name")); - assertEquals("blue", vertexAfterTx.get("color")); - - cluster.close(); - } - - @Test - public void shouldExecuteScriptInSessionOnTransactionalWithManualTransactionsGraph() throws Exception { - - final Cluster cluster = TestClientFactory.open(); - final Client client = cluster.connect(name.getMethodName()); - final Client sessionlessClient = cluster.connect(); - client.submit("graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);null").all().get(); - client.submit("graph.tx().open()").all().get(); - - final Vertex vertexBeforeTx = client.submit("v=g.addV(\"person\").property(\"name\", \"stephen\").next()").all().get().get(0).getVertex(); - assertEquals("person", vertexBeforeTx.label()); - - final String nameValueFromV = client.submit("g.V().values(\"name\").next()").all().get().get(0).getString(); - assertEquals("stephen", nameValueFromV); - - final Vertex vertexFromBinding = client.submit("v").all().get().get(0).getVertex(); - assertEquals("person", vertexFromBinding.label()); - - client.submit("g.V(v).property(\"color\",\"blue\")").all().get(); - client.submit("g.tx().commit()").all().get(); - - // Run a sessionless request to change transaction.readWriteConsumer back to AUTO - // The will make the next in session request fail if consumers aren't ThreadLocal - sessionlessClient.submit("g.V().next()").all().get(); - - client.submit("g.tx().open()").all().get(); - - final Map<String,Object> vertexAfterTx = client.submit("g.V().valueMap().by(unfold())").all().get().get(0).get(Map.class); - assertEquals("stephen", vertexAfterTx.get("name")); - assertEquals("blue", vertexAfterTx.get("color")); - - client.submit("g.tx().rollback()").all().get(); - - cluster.close(); - } - - @Test - public void shouldExecuteInSessionAndSessionlessWithoutOpeningTransaction() throws Exception { - - final Cluster cluster = TestClientFactory.open(); - try { - final Client sessionClient = cluster.connect(name.getMethodName()); - final Client sessionlessClient = cluster.connect(); - - //open transaction in session, then add vertex and commit - sessionClient.submit("g.tx().open()").all().get(); - final Vertex vertexBeforeTx = sessionClient.submit("v=g.addV(\"person\").property(\"name\",\"stephen\").next()").all().get().get(0).getVertex(); - assertEquals("person", vertexBeforeTx.label()); - sessionClient.submit("g.tx().commit()").all().get(); - - // check that session transaction is closed - final boolean isOpen = sessionClient.submit("g.tx().isOpen()").all().get().get(0).getBoolean(); - assertTrue("Transaction should be closed", !isOpen); - - //run a sessionless read - sessionlessClient.submit("g.V()").all().get(); - - // check that session transaction is still closed - final boolean isOpenAfterSessionless = sessionClient.submit("g.tx().isOpen()").all().get().get(0).getBoolean(); - assertTrue("Transaction should stil be closed", !isOpenAfterSessionless); - } finally { - cluster.close(); - } - } - @Test public void shouldExecuteSessionlessScriptOnTransactionalGraph() throws Exception { @@ -1531,105 +1337,6 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration cluster.close(); } - @Test - public void shouldManageTransactionsInSession() throws Exception { - - final Cluster cluster = TestClientFactory.open(); - final Client client = cluster.connect(); - final Client sessionWithManagedTx = cluster.connect(name.getMethodName() + "-managed", true); - final Client sessionWithoutManagedTx = cluster.connect(name.getMethodName() + "-not-managed"); - - // this should auto-commit - sessionWithManagedTx.submit("v = g.addV().property('name','stephen').next()").all().get().get(0).getVertex(); - - // the other clients should see that change because of auto-commit - assertThat(client.submit("g.V().has('name','stephen').hasNext()").all().get().get(0).getBoolean(), is(true)); - assertThat(sessionWithoutManagedTx.submit("g.V().has('name','stephen').hasNext()").all().get().get(0).getBoolean(), is(true)); - - // this should NOT auto-commit - final Vertex vDaniel = sessionWithoutManagedTx.submit("v = g.addV().property('name','daniel').next()").all().get().get(0).getVertex(); - - // the other clients should NOT see that change because of auto-commit - assertThat(client.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(false)); - assertThat(sessionWithManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(false)); - - // but "v" should still be there - final Vertex vDanielAgain = sessionWithoutManagedTx.submit("v").all().get().get(0).getVertex(); - assertEquals(vDaniel.id(), vDanielAgain.id()); - - // now commit manually - sessionWithoutManagedTx.submit("g.tx().commit()").all().get(); - - // should be there for all now - assertThat(client.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true)); - assertThat(sessionWithManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true)); - assertThat(sessionWithoutManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true)); - - cluster.close(); - } - - @Test - public void shouldProcessSessionRequestsInOrderAfterTimeout() throws Exception { - final Cluster cluster = TestClientFactory.open(); - - try { - // this configures the client to behave like OpProcessor for UnifiedChannelizer - final Client.SessionSettings settings = Client.SessionSettings.build(). - sessionId(name.getMethodName()).create(); - final Client client = cluster.connect(Client.Settings.build().useSession(settings).create()); - - for (int index = 0; index < 50; index++) { - final CompletableFuture<ResultSet> first = client.submitAsync( - "Object mon1 = 'mon1';\n" + - "synchronized (mon1) {\n" + - " mon1.wait();\n" + - "} "); - - final CompletableFuture<ResultSet> second = client.submitAsync( - "Object mon2 = 'mon2';\n" + - "synchronized (mon2) {\n" + - " mon2.wait();\n" + - "}"); - - final CompletableFuture<ResultSet> third = client.submitAsync( - "Object mon3 = 'mon3';\n" + - "synchronized (mon3) {\n" + - " mon3.wait();\n" + - "}"); - - final CompletableFuture<ResultSet> fourth = client.submitAsync( - "Object mon4 = 'mon4';\n" + - "synchronized (mon4) {\n" + - " mon4.wait();\n" + - "}"); - - final CompletableFuture<List<Result>> futureFirst = first.get().all(); - final CompletableFuture<List<Result>> futureSecond = second.get().all(); - final CompletableFuture<List<Result>> futureThird = third.get().all(); - final CompletableFuture<List<Result>> futureFourth = fourth.get().all(); - - assertFutureTimeout(futureFirst); - assertFutureTimeout(futureSecond); - assertFutureTimeout(futureThird); - assertFutureTimeout(futureFourth); - } - } finally { - cluster.close(); - } - } - - private void assertFutureTimeout(final CompletableFuture<List<Result>> f) { - try { - f.get(); - fail("Should have timed out"); - } catch (Exception ex) { - final Throwable root = ExceptionHelper.getRootCause(ex); - assertThat(root, instanceOf(ResponseException.class)); - assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, ((ResponseException) root).getResponseStatusCode()); - assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms"))); - } - } - @Test public void shouldCloseAllClientsOnCloseOfCluster() throws Exception { final Cluster cluster = TestClientFactory.open(); @@ -1822,11 +1529,8 @@ public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegration client.submit("1+1").all().get(3000, TimeUnit.MILLISECONDS); fail("Should throw exception on the retry"); } catch (RuntimeException re2) { - if (client instanceof Client.SessionedClient) { - assertThat(re2.getCause().getCause(), instanceOf(ConnectionException.class)); - } else { - assertThat(re2.getCause().getCause().getCause(), instanceOf(ConnectException.class)); - } + assertThat(re2.getCause().getCause().getCause(), instanceOf(ConnectException.class)); + } // diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java index b70d8c18f7..82aba1cef3 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/HttpDriverIntegrateTest.java @@ -40,8 +40,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; import static org.hamcrest.CoreMatchers.is; @@ -63,19 +66,6 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes return settings; } -// @Test -// public void shouldSubmitScriptWithGraphSON() throws Exception { -// final Cluster cluster = TestClientFactory.build().create(); -// try { -// final Client client = cluster.connect(); -// assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); -// } catch (Exception ex) { -// throw ex; -// } finally { -// cluster.close(); -// } -// } - @Test public void shouldSubmitScriptWithGraphBinary() throws Exception { final Cluster cluster = TestClientFactory.build().create(); @@ -90,43 +80,11 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes } } -// @Test -// public void shouldSubmitBytecodeWithGraphSON() throws Exception { -// final Cluster cluster = TestClientFactory.build() -// .channelizer(Channelizer.HttpChannelizer.class) -// .serializer(Serializers.GRAPHSON_V4) -// .create(); -// try { -// final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); -// final String result = g.inject("2").toList().get(0); -// assertEquals("2", result); -// } catch (Exception ex) { -// throw ex; -// } finally { -// cluster.close(); -// } -// } - -// @Test -// public void shouldGetErrorForBytecodeWithUntypedGraphSON() throws Exception { -// final Cluster cluster = TestClientFactory.build().create(); -// try { -// final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); -// g.inject("2").toList(); -// fail("Exception expected"); -// } catch (EncoderException ex) { -// assertThat(ex.getMessage(), allOf(containsString("An error occurred during serialization of this request"), -// containsString("it could not be sent to the server - Reason: only GraphSON3 and GraphBinary recommended for serialization of Bytecode requests, but used org.apache.tinkerpop.gremlin."))); -// } finally { -// cluster.close(); -// } -// } - @Test - public void shouldSubmitBytecodeWithGraphBinary() throws Exception { + public void shouldSubmitBytecodeWithGraphBinary() { final Cluster cluster = TestClientFactory.build().create(); try { - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); + final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster)); final String result = g.inject("2").toList().get(0); assertEquals("2", result); } catch (Exception ex) { @@ -137,42 +95,72 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes } @Test - public void shouldFailToUseSession() { + public void shouldSubmitMultipleQueriesWithSameConnection() throws InterruptedException, ExecutionException { final Cluster cluster = TestClientFactory.build().create(); + final Client client = cluster.connect(); + try { - final Client client = cluster.connect("shouldFailToUseSession"); - client.submit("1+1").all().get(); - fail("Can't use session with HTTP"); + final int result = client.submit("Thread.sleep(1000);1").all().get().get(0).getInt(); + assertEquals(1, result); + + final AtomicInteger result2 = new AtomicInteger(-1); + final Thread thread = new Thread(() -> { + try { + result2.set(client.submit("2").all().get().get(0).getInt()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + thread.join(); + + assertEquals(2, result2.get()); } catch (Exception ex) { - final Throwable t = ExceptionUtils.getRootCause(ex); - assertEquals("Cannot use sessions or tx() with HttpChannelizer", t.getMessage()); + throw ex; } finally { cluster.close(); } } @Test - public void shouldFailToUseTx() { + public void shouldFailToUseSession() { final Cluster cluster = TestClientFactory.build().create(); try { - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster)); - final Transaction tx = g.tx(); - final GraphTraversalSource gtx = tx.begin(); - gtx.inject("1").toList(); - fail("Can't use tx() with HTTP"); + final Client client = cluster.connect("shouldFailToUseSession"); + client.submit("1+1").all().get(); + fail("Can't use session with HTTP"); } catch (Exception ex) { final Throwable t = ExceptionUtils.getRootCause(ex); - assertEquals("Cannot use sessions or tx() with HttpChannelizer", t.getMessage()); + // assertEquals("Cannot use sessions or tx() with HttpChannelizer", t.getMessage()); + assertEquals("not implemented", t.getMessage()); } finally { cluster.close(); } } +// @Test +// public void shouldFailToUseTx() { +// final Cluster cluster = TestClientFactory.build().create(); +// try { +// final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster)); +// final Transaction tx = g.tx(); +// final GraphTraversalSource gtx = tx.begin(); +// gtx.inject("1").toList(); +// fail("Can't use tx() with HTTP"); +// } catch (Exception ex) { +// final Throwable t = ExceptionUtils.getRootCause(ex); +// // assertEquals("Cannot use sessions or tx() with HttpChannelizer", t.getMessage()); +// assertEquals("not implemented", t.getMessage()); +// } finally { +// cluster.close(); +// } +// } + @Test - public void shouldDeserializeErrorWithGraphBinary() throws Exception { + public void shouldDeserializeErrorWithGraphBinary() { final Cluster cluster = TestClientFactory.build().create(); try { - final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist")); + final GraphTraversalSource g = traversal().with(DriverRemoteConnection.using(cluster, "doesNotExist")); g.V().next(); fail("Expected exception to be thrown."); } catch (Exception ex) { @@ -182,20 +170,6 @@ public class HttpDriverIntegrateTest extends AbstractGremlinServerIntegrationTes } } -// @Test -// public void shouldDeserializeErrorWithGraphSON() throws Exception { -// final Cluster cluster = TestClientFactory.build().create(); -// try { -// final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(cluster, "doesNotExist")); -// g.V().next(); -// fail("Expected exception to be thrown."); -// } catch (Exception ex) { -// assert ex.getMessage().contains("Could not rebind"); -// } finally { -// cluster.close(); -// } -// } - @Ignore("driver side error") @Test public void shouldReportErrorWhenRequestCantBeSerialized() throws Exception {
