This is an automated email from the ASF dual-hosted git repository.
xiazcy pushed a commit to branch 3.7-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/3.7-dev by this push:
new 22db8cf08c Parallel Authentication Fix (#2551)
22db8cf08c is described below
commit 22db8cf08c1b62badd9f5a5a497db454182d16d4
Author: kenhuuu <[email protected]>
AuthorDate: Mon Apr 8 18:59:48 2024 -0700
Parallel Authentication Fix (#2551)
* fix: failing authentication when multiple initially requests are executed
concurrently
* Fix nits in PR for TINKERPOP-3061 regarding sasl authentication.
---------
Co-authored-by: Tiến Nguyễn Khắc <[email protected]>
---
CHANGELOG.asciidoc | 1 +
.../gremlin/driver/simple/AbstractClient.java | 14 +-
.../test/integration/sasl-authentication-tests.js | 13 ++
.../server/handler/SaslAuthenticationHandler.java | 257 +++++++++++++--------
.../tinkerpop/gremlin/server/handler/StateKey.java | 9 +
.../server/GremlinServerAuthIntegrateTest.java | 81 ++++++-
6 files changed, 264 insertions(+), 111 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 02131c335e..5e91f7ce7e 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,7 @@
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Deprecated `ltrim()` and `rTrim()` in favor of `l_trim()` and `r_trim` in
Python.
* Fixed bug in `onCreate` for `mergeV()` where use of the `Cardinality`
functions was not properly handled.
+* Fixed multiple concurrent initially requests caused authentication to fail.
[[release-3-7-1]]
=== TinkerPop 3.7.1 (November 20, 2023)
diff --git
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
index 00a67b42eb..c353f45c07 100644
---
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
+++
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
@@ -25,10 +25,12 @@ import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -49,7 +51,7 @@ public abstract class AbstractClient implements SimpleClient {
@Override
public void submit(final RequestMessage requestMessage, final
Consumer<ResponseMessage> callback) throws Exception {
- callbackResponseHandler.callback = callback;
+
callbackResponseHandler.callbackByRequestId.put(requestMessage.getRequestId(),
callback);
writeAndFlush(requestMessage);
}
@@ -65,7 +67,7 @@ public abstract class AbstractClient implements SimpleClient {
public CompletableFuture<List<ResponseMessage>> submitAsync(final
RequestMessage requestMessage) throws Exception {
final List<ResponseMessage> results = new ArrayList<>();
final CompletableFuture<List<ResponseMessage>> f = new
CompletableFuture<>();
- callbackResponseHandler.callback = response -> {
+
callbackResponseHandler.callbackByRequestId.put(requestMessage.getRequestId(),
response -> {
if (f.isDone())
throw new RuntimeException("A terminating message was already
encountered - no more messages should have been received");
@@ -75,7 +77,7 @@ public abstract class AbstractClient implements SimpleClient {
if (response.getStatus().getCode().isFinalResponse()) {
f.complete(results);
}
- };
+ });
writeAndFlush(requestMessage);
@@ -83,11 +85,11 @@ public abstract class AbstractClient implements
SimpleClient {
}
static class CallbackResponseHandler extends
SimpleChannelInboundHandler<ResponseMessage> {
- public Consumer<ResponseMessage> callback;
+ public Map<UUID, Consumer<ResponseMessage>> callbackByRequestId = new
HashMap<>();
@Override
protected void channelRead0(final ChannelHandlerContext
channelHandlerContext, final ResponseMessage response) throws Exception {
- callback.accept(response);
+ callbackByRequestId.get(response.getRequestId()).accept(response);
}
}
}
diff --git
a/gremlin-javascript/src/main/javascript/gremlin-javascript/test/integration/sasl-authentication-tests.js
b/gremlin-javascript/src/main/javascript/gremlin-javascript/test/integration/sasl-authentication-tests.js
index 1fa5850cc2..3d2b937666 100644
---
a/gremlin-javascript/src/main/javascript/gremlin-javascript/test/integration/sasl-authentication-tests.js
+++
b/gremlin-javascript/src/main/javascript/gremlin-javascript/test/integration/sasl-authentication-tests.js
@@ -54,6 +54,19 @@ describe('DriverRemoteConnection', function () {
});
});
+ it('should be able to send multiple requests concurrently with valid
credentials and parse the response', async function () {
+ connection =
helper.getSecureConnectionWithPlainTextSaslAuthenticator(null, 'stephen',
'password');
+
+ const submissions = await Promise.all(
+ Array.from({ length: 10 }).map(() => connection.submit(new
Bytecode().addStep('V', []).addStep('tail', []))),
+ );
+
+ submissions.forEach((response) => {
+ assert.ok(response);
+ assert.ok(response.traversers);
+ });
+ });
+
it('should send the request with invalid credentials and parse the
response error', function () {
connection =
helper.getSecureConnectionWithPlainTextSaslAuthenticator(null, 'Bob',
'password');
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
index 07faaa6f51..34345ecf5e 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
@@ -22,40 +22,44 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-
-import io.netty.util.AttributeMap;
-import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.tinkerpop.gremlin.server.GremlinServer;
import org.apache.tinkerpop.gremlin.server.Settings;
-import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Function;
+
/**
* A SASL authentication handler that allows the {@link Authenticator} to be
plugged into it. This handler is meant
* to be used with protocols that process a {@link RequestMessage} such as the
{@link WebSocketChannelizer}
*
- * @author Stephen Mallette (http://stephen.genoprime.com)
+ * @author Stephen Mallette (<a
href="http://stephen.genoprime.com">http://stephen.genoprime.com</a>)
*/
@ChannelHandler.Sharable
public class SaslAuthenticationHandler extends AbstractAuthenticationHandler {
private static final Logger logger =
LoggerFactory.getLogger(SaslAuthenticationHandler.class);
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
+ public static final Duration MAX_REQUEST_DEFERRABLE_DURATION =
Duration.ofSeconds(5);
private static final Logger auditLogger =
LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
protected final Settings settings;
@@ -75,96 +79,147 @@ public class SaslAuthenticationHandler extends
AbstractAuthenticationHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
throws Exception {
- if (msg instanceof RequestMessage){
- final RequestMessage requestMessage = (RequestMessage) msg;
-
- final Attribute<Authenticator.SaslNegotiator> negotiator =
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
- final Attribute<RequestMessage> request = ((AttributeMap)
ctx).attr(StateKey.REQUEST_MESSAGE);
- if (negotiator.get() == null) {
- try {
- // First time through so save the request and send an
AUTHENTICATE challenge with no data
-
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
- request.set(requestMessage);
- final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
- .code(ResponseStatusCode.AUTHENTICATE).create();
- ctx.writeAndFlush(authenticate);
- } catch (Exception ex) {
- // newSaslNegotiator can cause troubles - if we don't
catch and respond nicely the driver seems
- // to hang until timeout which isn't so nice. treating
this like a server error as it means that
- // the Authenticator isn't really ready to deal with
requests for some reason.
- logger.error(String.format("%s is not ready to handle
requests - check its configuration or related services",
- authenticator.getClass().getSimpleName()), ex);
-
- final ResponseMessage error =
ResponseMessage.build(requestMessage)
- .statusMessage("Authenticator is not ready to
handle requests")
- .code(ResponseStatusCode.SERVER_ERROR).create();
- ctx.writeAndFlush(error);
- }
- } else {
- if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
-
- final Object saslObject =
requestMessage.getArgs().get(Tokens.ARGS_SASL);
- final byte[] saslResponse;
-
- if(saslObject instanceof String) {
- saslResponse = BASE64_DECODER.decode((String)
saslObject);
- } else {
- final ResponseMessage error =
ResponseMessage.build(request.get())
- .statusMessage("Incorrect type for : " +
Tokens.ARGS_SASL + " - base64 encoded String is expected")
-
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create();
- ctx.writeAndFlush(error);
- return;
- }
-
- try {
- final byte[] saslMessage =
negotiator.get().evaluateResponse(saslResponse);
- if (negotiator.get().isComplete()) {
- final AuthenticatedUser user =
negotiator.get().getAuthenticatedUser();
-
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
- // User name logged with the remote socket address
and authenticator classname for audit logging
- if (settings.enableAuditLog) {
- String address =
ctx.channel().remoteAddress().toString();
- if (address.startsWith("/") &&
address.length() > 1) address = address.substring(1);
- final String[] authClassParts =
authenticator.getClass().toString().split("[.]");
- auditLogger.info("User {} with address {}
authenticated by {}",
- user.getName(), address,
authClassParts[authClassParts.length - 1]);
- }
- // If we have got here we are authenticated so
remove the handler and pass
- // the original message down the pipeline for
processing
- ctx.pipeline().remove(this);
- final RequestMessage original = request.get();
- ctx.fireChannelRead(original);
- } else {
- // not done here - send back the sasl message for
next challenge.
- final Map<String,Object> metadata = new
HashMap<>();
- metadata.put(Tokens.ARGS_SASL,
BASE64_ENCODER.encodeToString(saslMessage));
- final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
- .statusAttributes(metadata)
-
.code(ResponseStatusCode.AUTHENTICATE).create();
- ctx.writeAndFlush(authenticate);
- }
- } catch (AuthenticationException ae) {
- final ResponseMessage error =
ResponseMessage.build(request.get())
- .statusMessage(ae.getMessage())
-
.code(ResponseStatusCode.UNAUTHORIZED).create();
- ctx.writeAndFlush(error);
- }
- } else {
- final ResponseMessage error =
ResponseMessage.build(requestMessage)
- .statusMessage("Failed to authenticate")
- .code(ResponseStatusCode.UNAUTHORIZED).create();
- ctx.writeAndFlush(error);
- }
- }
- } else {
+ if (!(msg instanceof RequestMessage)) {
logger.warn("{} only processes RequestMessage instances - received
{} - channel closing",
this.getClass().getSimpleName(), msg.getClass());
ctx.close();
+ return;
+ }
+
+ final RequestMessage requestMessage = (RequestMessage) msg;
+
+ final Attribute<Authenticator.SaslNegotiator> negotiator =
ctx.channel().attr(StateKey.NEGOTIATOR);
+ final Attribute<RequestMessage> request =
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+ final Attribute<Pair<LocalDateTime, List<RequestMessage>>>
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+ if (negotiator.get() == null) {
+ try {
+ // First time through so save the request and send an
AUTHENTICATE challenge with no data
+
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
+ request.set(requestMessage);
+ final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
+ .code(ResponseStatusCode.AUTHENTICATE).create();
+ ctx.writeAndFlush(authenticate);
+ } catch (Exception ex) {
+ // newSaslNegotiator can cause troubles - if we don't catch
and respond nicely the driver seems
+ // to hang until timeout which isn't so nice. treating this
like a server error as it means that
+ // the Authenticator isn't really ready to deal with requests
for some reason.
+ logger.error(String.format("%s is not ready to handle requests
- check its configuration or related services",
+ authenticator.getClass().getSimpleName()), ex);
+
+ respondWithError(
+ requestMessage,
+ builder -> builder.statusMessage("Authenticator is not
ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
+ ctx);
+ }
+
+ return;
+ } else if (!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
+ // If authentication negotiation is pending, store subsequent
non-authentication requests for later processing
+ deferredRequests.setIfAbsent(new
ImmutablePair<>(LocalDateTime.now(), new ArrayList<>()));
+ deferredRequests.get().getValue().add(requestMessage);
+
+ final Duration deferredDuration =
Duration.between(deferredRequests.get().getKey(), LocalDateTime.now());
+
+ if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) >
0) {
+ respondWithError(
+ requestMessage,
+ builder -> builder.statusMessage("Authentication did
not finish in the allowed duration (" + MAX_REQUEST_DEFERRABLE_DURATION + "s).")
+ .code(ResponseStatusCode.UNAUTHORIZED),
+ ctx);
+ return;
+ }
+
+ return;
+ } else if (!requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
+ // This is an authentication request that is missing a "sasl"
argument.
+ respondWithError(
+ requestMessage,
+ builder -> builder.statusMessage("Failed to
authenticate").code(ResponseStatusCode.UNAUTHORIZED),
+ ctx);
+ return;
+ }
+
+ final Object saslObject =
requestMessage.getArgs().get(Tokens.ARGS_SASL);
+
+ if (!(saslObject instanceof String)) {
+ respondWithError(
+ requestMessage,
+ builder -> builder
+ .statusMessage("Incorrect type for : " +
Tokens.ARGS_SASL + " - base64 encoded String is expected")
+
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST),
+ ctx);
+ return;
}
+
+ try {
+ final byte[] saslResponse = BASE64_DECODER.decode((String)
saslObject);
+ final byte[] saslMessage =
negotiator.get().evaluateResponse(saslResponse);
+
+ if (!negotiator.get().isComplete()) {
+ // not done here - send back the sasl message for next
challenge.
+ final HashMap<String, Object> metadata = new HashMap<>();
+ metadata.put(Tokens.ARGS_SASL,
BASE64_ENCODER.encodeToString(saslMessage));
+ final ResponseMessage authenticate =
ResponseMessage.build(requestMessage)
+ .statusAttributes(metadata)
+ .code(ResponseStatusCode.AUTHENTICATE).create();
+ ctx.writeAndFlush(authenticate);
+ return;
+ }
+
+ final org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser
user = negotiator.get().getAuthenticatedUser();
+ ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
+ // User name logged with the remote socket address and
authenticator classname for audit logging
+ if (settings.enableAuditLog) {
+ String address = ctx.channel().remoteAddress().toString();
+ if (address.startsWith("/") && address.length() > 1) address =
address.substring(1);
+ final String[] authClassParts =
authenticator.getClass().toString().split("[.]");
+ auditLogger.info("User {} with address {} authenticated by {}",
+ user.getName(), address,
authClassParts[authClassParts.length - 1]);
+ }
+ // If we have got here we are authenticated so remove the handler
and pass
+ // the original message down the pipeline for processing
+ ctx.pipeline().remove(this);
+ final RequestMessage original = request.get();
+ ctx.fireChannelRead(original);
+
+ // Also send deferred requests if there are any down the pipeline
for processing
+ if (deferredRequests.get() != null) {
+
deferredRequests.getAndSet(null).getValue().forEach(ctx::fireChannelRead);
+ }
+ } catch (AuthenticationException ae) {
+ respondWithError(
+ requestMessage,
+ builder ->
builder.statusMessage(ae.getMessage()).code(ResponseStatusCode.UNAUTHORIZED),
+ ctx);
+ }
+ }
+
+ private void respondWithError(final RequestMessage requestMessage, final
Function<ResponseMessage.Builder, ResponseMessage.Builder> buildResponse, final
ChannelHandlerContext ctx) {
+ final Attribute<RequestMessage> originalRequest =
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+ final Attribute<Pair<LocalDateTime, List<RequestMessage>>>
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+ if (!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
+
ctx.write(buildResponse.apply(ResponseMessage.build(requestMessage)).create());
+ }
+
+ if (originalRequest.get() != null) {
+
ctx.write(buildResponse.apply(ResponseMessage.build(originalRequest.get())).create());
+ }
+
+ if (deferredRequests.get() != null) {
+ deferredRequests
+ .getAndSet(null).getValue().stream()
+ .map(ResponseMessage::build)
+ .map(buildResponse)
+ .map(ResponseMessage.Builder::create)
+ .forEach(ctx::write);
+ }
+
+ ctx.flush();
}
- private InetAddress getRemoteInetAddress(final ChannelHandlerContext ctx)
- {
+ private InetAddress getRemoteInetAddress(final ChannelHandlerContext ctx) {
final Channel channel = ctx.channel();
if (null == channel)
@@ -172,9 +227,9 @@ public class SaslAuthenticationHandler extends
AbstractAuthenticationHandler {
final SocketAddress genericSocketAddr = channel.remoteAddress();
- if (null == genericSocketAddr || !(genericSocketAddr instanceof
InetSocketAddress))
+ if (!(genericSocketAddr instanceof InetSocketAddress))
return null;
- return ((InetSocketAddress)genericSocketAddr).getAddress();
+ return ((InetSocketAddress) genericSocketAddr).getAddress();
}
}
diff --git
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
index cfb2e320ad..4f693942e1 100644
---
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
+++
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.server.handler;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
@@ -25,6 +26,9 @@ import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
import org.apache.tinkerpop.gremlin.server.op.session.Session;
import io.netty.util.AttributeKey;
+import java.time.LocalDateTime;
+import java.util.List;
+
/**
* Keys used in the various handlers to store state in the pipeline.
*
@@ -59,6 +63,11 @@ public final class StateKey {
*/
public static final AttributeKey<RequestMessage> REQUEST_MESSAGE =
AttributeKey.valueOf("request");
+ /**
+ * The key for the deferred requests.
+ */
+ public static final AttributeKey<Pair<LocalDateTime,
List<RequestMessage>>> DEFERRED_REQUEST_MESSAGES =
AttributeKey.valueOf("deferredRequests");
+
/**
* The key for the current {@link AuthenticatedUser}.
*/
diff --git
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index e39d8c2b44..01367ec49c 100644
---
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -18,23 +18,34 @@
*/
package org.apache.tinkerpop.gremlin.server;
-import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.simple.WebSocketClient;
import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
-import org.ietf.jgss.GSSException;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.util.ser.Serializers;
+import org.ietf.jgss.GSSException;
import org.junit.Test;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-
-import org.apache.tinkerpop.gremlin.util.ser.Serializers;
+import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.AnyOf.anyOf;
@@ -163,6 +174,68 @@ public class GremlinServerAuthIntegrateTest extends
AbstractGremlinServerIntegra
}
}
+ @Test
+ public void
shouldFailAuthenticateWithUnAuthenticatedRequestAfterMaxDeferrableDuration()
throws Exception {
+ try (WebSocketClient client =
TestClientFactory.createWebSocketClient()) {
+ // First request will initiate the authentication handshake
+ // Subsequent requests will be deferred
+ CompletableFuture<List<ResponseMessage>>
futureOfRequestWithinAuthDuration1 = client.submitAsync("");
+ CompletableFuture<List<ResponseMessage>>
futureOfRequestWithinAuthDuration2 = client.submitAsync("");
+ CompletableFuture<List<ResponseMessage>>
futureOfRequestWithinAuthDuration3 = client.submitAsync("");
+
+ // After the maximum allowed deferred request duration,
+ // any non-authenticated request will invalidate all requests with
429 error
+ CompletableFuture<List<ResponseMessage>>
futureOfRequestSubmittedTooLate = CompletableFuture.runAsync(() -> {
+ try {
+
Thread.sleep(SaslAuthenticationHandler.MAX_REQUEST_DEFERRABLE_DURATION.plus(Duration.ofSeconds(1)).toMillis());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).thenCompose((__) -> {
+ try {
+ return client.submitAsync("");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertEquals(2, futureOfRequestWithinAuthDuration1.get().size());
+ assertEquals(1, futureOfRequestWithinAuthDuration2.get().size());
+ assertEquals(1, futureOfRequestWithinAuthDuration3.get().size());
+ assertEquals(1, futureOfRequestSubmittedTooLate.get().size());
+
+ assertEquals(ResponseStatusCode.AUTHENTICATE,
futureOfRequestWithinAuthDuration1.get().get(0).getStatus().getCode());
+ assertEquals(ResponseStatusCode.UNAUTHORIZED,
futureOfRequestWithinAuthDuration1.get().get(1).getStatus().getCode());
+ assertEquals(ResponseStatusCode.UNAUTHORIZED,
futureOfRequestWithinAuthDuration2.get().get(0).getStatus().getCode());
+ assertEquals(ResponseStatusCode.UNAUTHORIZED,
futureOfRequestWithinAuthDuration3.get().get(0).getStatus().getCode());
+ assertEquals(ResponseStatusCode.UNAUTHORIZED,
futureOfRequestSubmittedTooLate.get().get(0).getStatus().getCode());
+ }
+ }
+
+ @Test
+ public void shouldFailAuthenticateWithIncorrectParallelRequests() throws
Exception {
+ try (WebSocketClient client =
TestClientFactory.createWebSocketClient()) {
+
+ CompletableFuture<List<ResponseMessage>> firstRequest =
client.submitAsync("1");
+ CompletableFuture<List<ResponseMessage>> secondRequest =
client.submitAsync("2");
+ CompletableFuture<List<ResponseMessage>> thirdRequest =
client.submitAsync("3");
+
+ Thread.sleep(500);
+
+ // send some incorrect value for username password which should
cause all requests to fail.
+
client.submitAsync(RequestMessage.build(Tokens.OPS_AUTHENTICATION).addArg(Tokens.ARGS_SASL,
"someincorrectvalue").create());
+
+ assertEquals(2, firstRequest.get().size());
+ assertEquals(1, secondRequest.get().size());
+ assertEquals(1, thirdRequest.get().size());
+
+ assertEquals(ResponseStatusCode.AUTHENTICATE,
firstRequest.get().get(0).getStatus().getCode());
+ assertEquals(ResponseStatusCode.UNAUTHORIZED,
firstRequest.get().get(1).getStatus().getCode());
+ assertEquals(ResponseStatusCode.UNAUTHORIZED,
secondRequest.get().get(0).getStatus().getCode());
+ assertEquals(ResponseStatusCode.UNAUTHORIZED,
thirdRequest.get().get(0).getStatus().getCode());
+ }
+ }
+
@Test
public void shouldAuthenticateWithPlainTextOverDefaultJSONSerialization()
throws Exception {
final Cluster cluster =
TestClientFactory.build().serializer(Serializers.GRAPHSON)