This is an automated email from the ASF dual-hosted git repository.

xiazcy pushed a commit to branch 3.7-dev
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/3.7-dev by this push:
     new 22db8cf08c Parallel Authentication Fix (#2551)
22db8cf08c is described below

commit 22db8cf08c1b62badd9f5a5a497db454182d16d4
Author: kenhuuu <[email protected]>
AuthorDate: Mon Apr 8 18:59:48 2024 -0700

    Parallel Authentication Fix (#2551)
    
    * fix: failing authentication when multiple initially requests are executed 
concurrently
    
    * Fix nits in PR for TINKERPOP-3061 regarding sasl authentication.
    
    ---------
    
    Co-authored-by: Tiến Nguyễn Khắc <[email protected]>
---
 CHANGELOG.asciidoc                                 |   1 +
 .../gremlin/driver/simple/AbstractClient.java      |  14 +-
 .../test/integration/sasl-authentication-tests.js  |  13 ++
 .../server/handler/SaslAuthenticationHandler.java  | 257 +++++++++++++--------
 .../tinkerpop/gremlin/server/handler/StateKey.java |   9 +
 .../server/GremlinServerAuthIntegrateTest.java     |  81 ++++++-
 6 files changed, 264 insertions(+), 111 deletions(-)

diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 02131c335e..5e91f7ce7e 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,7 @@ 
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 
 * Deprecated `ltrim()` and `rTrim()` in favor of `l_trim()` and `r_trim` in 
Python.
 * Fixed bug in `onCreate` for `mergeV()` where use of the `Cardinality` 
functions was not properly handled.
+* Fixed multiple concurrent initially requests caused authentication to fail.
 
 [[release-3-7-1]]
 === TinkerPop 3.7.1 (November 20, 2023)
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
index 00a67b42eb..c353f45c07 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/simple/AbstractClient.java
@@ -25,10 +25,12 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -49,7 +51,7 @@ public abstract class AbstractClient implements SimpleClient {
 
     @Override
     public void submit(final RequestMessage requestMessage, final 
Consumer<ResponseMessage> callback) throws Exception {
-        callbackResponseHandler.callback = callback;
+        
callbackResponseHandler.callbackByRequestId.put(requestMessage.getRequestId(), 
callback);
         writeAndFlush(requestMessage);
     }
 
@@ -65,7 +67,7 @@ public abstract class AbstractClient implements SimpleClient {
     public CompletableFuture<List<ResponseMessage>> submitAsync(final 
RequestMessage requestMessage) throws Exception {
         final List<ResponseMessage> results = new ArrayList<>();
         final CompletableFuture<List<ResponseMessage>> f = new 
CompletableFuture<>();
-        callbackResponseHandler.callback = response -> {
+        
callbackResponseHandler.callbackByRequestId.put(requestMessage.getRequestId(), 
response -> {
             if (f.isDone())
                 throw new RuntimeException("A terminating message was already 
encountered - no more messages should have been received");
 
@@ -75,7 +77,7 @@ public abstract class AbstractClient implements SimpleClient {
             if (response.getStatus().getCode().isFinalResponse()) {
                 f.complete(results);
             }
-        };
+        });
 
         writeAndFlush(requestMessage);
 
@@ -83,11 +85,11 @@ public abstract class AbstractClient implements 
SimpleClient {
     }
 
     static class CallbackResponseHandler extends 
SimpleChannelInboundHandler<ResponseMessage> {
-        public Consumer<ResponseMessage> callback;
+        public Map<UUID, Consumer<ResponseMessage>> callbackByRequestId = new 
HashMap<>();
 
         @Override
         protected void channelRead0(final ChannelHandlerContext 
channelHandlerContext, final ResponseMessage response) throws Exception {
-            callback.accept(response);
+            callbackByRequestId.get(response.getRequestId()).accept(response);
         }
     }
 }
diff --git 
a/gremlin-javascript/src/main/javascript/gremlin-javascript/test/integration/sasl-authentication-tests.js
 
b/gremlin-javascript/src/main/javascript/gremlin-javascript/test/integration/sasl-authentication-tests.js
index 1fa5850cc2..3d2b937666 100644
--- 
a/gremlin-javascript/src/main/javascript/gremlin-javascript/test/integration/sasl-authentication-tests.js
+++ 
b/gremlin-javascript/src/main/javascript/gremlin-javascript/test/integration/sasl-authentication-tests.js
@@ -54,6 +54,19 @@ describe('DriverRemoteConnection', function () {
           });
       });
 
+      it('should be able to send multiple requests concurrently with valid 
credentials and parse the response', async function () {
+        connection = 
helper.getSecureConnectionWithPlainTextSaslAuthenticator(null, 'stephen', 
'password');
+
+        const submissions = await Promise.all(
+          Array.from({ length: 10 }).map(() => connection.submit(new 
Bytecode().addStep('V', []).addStep('tail', []))),
+        );
+
+        submissions.forEach((response) => {
+          assert.ok(response);
+          assert.ok(response.traversers);
+        });
+      });
+
       it('should send the request with invalid credentials and parse the 
response error', function () {
         connection = 
helper.getSecureConnectionWithPlainTextSaslAuthenticator(null, 'Bob', 
'password');
 
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
index 07faaa6f51..34345ecf5e 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/SaslAuthenticationHandler.java
@@ -22,40 +22,44 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.Attribute;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-
-import io.netty.util.AttributeMap;
-import org.apache.tinkerpop.gremlin.util.Tokens;
-import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
-import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.tinkerpop.gremlin.server.GremlinServer;
 import org.apache.tinkerpop.gremlin.server.Settings;
-import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
 import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 import org.apache.tinkerpop.gremlin.server.authz.Authorizer;
 import org.apache.tinkerpop.gremlin.server.channel.WebSocketChannelizer;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Function;
+
 /**
  * A SASL authentication handler that allows the {@link Authenticator} to be 
plugged into it. This handler is meant
  * to be used with protocols that process a {@link RequestMessage} such as the 
{@link WebSocketChannelizer}
  *
- * @author Stephen Mallette (http://stephen.genoprime.com)
+ * @author Stephen Mallette (<a 
href="http://stephen.genoprime.com";>http://stephen.genoprime.com</a>)
  */
 @ChannelHandler.Sharable
 public class SaslAuthenticationHandler extends AbstractAuthenticationHandler {
     private static final Logger logger = 
LoggerFactory.getLogger(SaslAuthenticationHandler.class);
     private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
     private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
+    public static final Duration MAX_REQUEST_DEFERRABLE_DURATION = 
Duration.ofSeconds(5);
     private static final Logger auditLogger = 
LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
 
     protected final Settings settings;
@@ -75,96 +79,147 @@ public class SaslAuthenticationHandler extends 
AbstractAuthenticationHandler {
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception {
-        if (msg instanceof RequestMessage){
-            final RequestMessage requestMessage = (RequestMessage) msg;
-
-            final Attribute<Authenticator.SaslNegotiator> negotiator = 
((AttributeMap) ctx).attr(StateKey.NEGOTIATOR);
-            final Attribute<RequestMessage> request = ((AttributeMap) 
ctx).attr(StateKey.REQUEST_MESSAGE);
-            if (negotiator.get() == null) {
-                try {
-                    // First time through so save the request and send an 
AUTHENTICATE challenge with no data
-                    
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
-                    request.set(requestMessage);
-                    final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                            .code(ResponseStatusCode.AUTHENTICATE).create();
-                    ctx.writeAndFlush(authenticate);
-                } catch (Exception ex) {
-                    // newSaslNegotiator can cause troubles - if we don't 
catch and respond nicely the driver seems
-                    // to hang until timeout which isn't so nice. treating 
this like a server error as it means that
-                    // the Authenticator isn't really ready to deal with 
requests for some reason.
-                    logger.error(String.format("%s is not ready to handle 
requests - check its configuration or related services",
-                            authenticator.getClass().getSimpleName()), ex);
-
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Authenticator is not ready to 
handle requests")
-                            .code(ResponseStatusCode.SERVER_ERROR).create();
-                    ctx.writeAndFlush(error);
-                }
-            } else {
-                if (requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION) 
&& requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
-                    
-                    final Object saslObject = 
requestMessage.getArgs().get(Tokens.ARGS_SASL);
-                    final byte[] saslResponse;
-                    
-                    if(saslObject instanceof String) {
-                        saslResponse = BASE64_DECODER.decode((String) 
saslObject);
-                    } else {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage("Incorrect type for : " + 
Tokens.ARGS_SASL + " - base64 encoded String is expected")
-                                
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST).create();
-                        ctx.writeAndFlush(error);
-                        return;
-                    }
-
-                    try {
-                        final byte[] saslMessage = 
negotiator.get().evaluateResponse(saslResponse);
-                        if (negotiator.get().isComplete()) {
-                            final AuthenticatedUser user = 
negotiator.get().getAuthenticatedUser();
-                            
ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
-                            // User name logged with the remote socket address 
and authenticator classname for audit logging
-                            if (settings.enableAuditLog) {
-                                String address = 
ctx.channel().remoteAddress().toString();
-                                if (address.startsWith("/") && 
address.length() > 1) address = address.substring(1);
-                                final String[] authClassParts = 
authenticator.getClass().toString().split("[.]");
-                                auditLogger.info("User {} with address {} 
authenticated by {}",
-                                        user.getName(), address, 
authClassParts[authClassParts.length - 1]);
-                            }
-                            // If we have got here we are authenticated so 
remove the handler and pass
-                            // the original message down the pipeline for 
processing
-                            ctx.pipeline().remove(this);
-                            final RequestMessage original = request.get();
-                            ctx.fireChannelRead(original);
-                        } else {
-                            // not done here - send back the sasl message for 
next challenge.
-                            final Map<String,Object> metadata = new 
HashMap<>();
-                            metadata.put(Tokens.ARGS_SASL, 
BASE64_ENCODER.encodeToString(saslMessage));
-                            final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
-                                    .statusAttributes(metadata)
-                                    
.code(ResponseStatusCode.AUTHENTICATE).create();
-                            ctx.writeAndFlush(authenticate);
-                        }
-                    } catch (AuthenticationException ae) {
-                        final ResponseMessage error = 
ResponseMessage.build(request.get())
-                                .statusMessage(ae.getMessage())
-                                
.code(ResponseStatusCode.UNAUTHORIZED).create();
-                        ctx.writeAndFlush(error);
-                    }
-                } else {
-                    final ResponseMessage error = 
ResponseMessage.build(requestMessage)
-                            .statusMessage("Failed to authenticate")
-                            .code(ResponseStatusCode.UNAUTHORIZED).create();
-                    ctx.writeAndFlush(error);
-                }
-            }
-        } else {
+        if (!(msg instanceof RequestMessage)) {
             logger.warn("{} only processes RequestMessage instances - received 
{} - channel closing",
                     this.getClass().getSimpleName(), msg.getClass());
             ctx.close();
+            return;
+        }
+
+        final RequestMessage requestMessage = (RequestMessage) msg;
+
+        final Attribute<Authenticator.SaslNegotiator> negotiator = 
ctx.channel().attr(StateKey.NEGOTIATOR);
+        final Attribute<RequestMessage> request = 
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+        final Attribute<Pair<LocalDateTime, List<RequestMessage>>> 
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+        if (negotiator.get() == null) {
+            try {
+                // First time through so save the request and send an 
AUTHENTICATE challenge with no data
+                
negotiator.set(authenticator.newSaslNegotiator(getRemoteInetAddress(ctx)));
+                request.set(requestMessage);
+                final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
+                        .code(ResponseStatusCode.AUTHENTICATE).create();
+                ctx.writeAndFlush(authenticate);
+            } catch (Exception ex) {
+                // newSaslNegotiator can cause troubles - if we don't catch 
and respond nicely the driver seems
+                // to hang until timeout which isn't so nice. treating this 
like a server error as it means that
+                // the Authenticator isn't really ready to deal with requests 
for some reason.
+                logger.error(String.format("%s is not ready to handle requests 
- check its configuration or related services",
+                        authenticator.getClass().getSimpleName()), ex);
+
+                respondWithError(
+                        requestMessage,
+                        builder -> builder.statusMessage("Authenticator is not 
ready to handle requests").code(ResponseStatusCode.SERVER_ERROR),
+                        ctx);
+            }
+
+            return;
+        } else if (!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
+            // If authentication negotiation is pending, store subsequent 
non-authentication requests for later processing
+            deferredRequests.setIfAbsent(new 
ImmutablePair<>(LocalDateTime.now(), new ArrayList<>()));
+            deferredRequests.get().getValue().add(requestMessage);
+
+            final Duration deferredDuration = 
Duration.between(deferredRequests.get().getKey(), LocalDateTime.now());
+
+            if (deferredDuration.compareTo(MAX_REQUEST_DEFERRABLE_DURATION) > 
0) {
+                respondWithError(
+                        requestMessage,
+                        builder -> builder.statusMessage("Authentication did 
not finish in the allowed duration (" + MAX_REQUEST_DEFERRABLE_DURATION + "s).")
+                                    .code(ResponseStatusCode.UNAUTHORIZED),
+                        ctx);
+                return;
+            }
+
+            return;
+        } else if (!requestMessage.getArgs().containsKey(Tokens.ARGS_SASL)) {
+            // This is an authentication request that is missing a "sasl" 
argument.
+            respondWithError(
+                    requestMessage,
+                    builder -> builder.statusMessage("Failed to 
authenticate").code(ResponseStatusCode.UNAUTHORIZED),
+                    ctx);
+            return;
+        }
+
+        final Object saslObject = 
requestMessage.getArgs().get(Tokens.ARGS_SASL);
+
+        if (!(saslObject instanceof String)) {
+            respondWithError(
+                    requestMessage,
+                    builder -> builder
+                            .statusMessage("Incorrect type for : " + 
Tokens.ARGS_SASL + " - base64 encoded String is expected")
+                            
.code(ResponseStatusCode.REQUEST_ERROR_MALFORMED_REQUEST),
+                    ctx);
+            return;
         }
+
+        try {
+            final byte[] saslResponse = BASE64_DECODER.decode((String) 
saslObject);
+            final byte[] saslMessage = 
negotiator.get().evaluateResponse(saslResponse);
+
+            if (!negotiator.get().isComplete()) {
+                // not done here - send back the sasl message for next 
challenge.
+                final HashMap<String, Object> metadata = new HashMap<>();
+                metadata.put(Tokens.ARGS_SASL, 
BASE64_ENCODER.encodeToString(saslMessage));
+                final ResponseMessage authenticate = 
ResponseMessage.build(requestMessage)
+                        .statusAttributes(metadata)
+                        .code(ResponseStatusCode.AUTHENTICATE).create();
+                ctx.writeAndFlush(authenticate);
+                return;
+            }
+
+            final org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser 
user = negotiator.get().getAuthenticatedUser();
+            ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user);
+            // User name logged with the remote socket address and 
authenticator classname for audit logging
+            if (settings.enableAuditLog) {
+                String address = ctx.channel().remoteAddress().toString();
+                if (address.startsWith("/") && address.length() > 1) address = 
address.substring(1);
+                final String[] authClassParts = 
authenticator.getClass().toString().split("[.]");
+                auditLogger.info("User {} with address {} authenticated by {}",
+                        user.getName(), address, 
authClassParts[authClassParts.length - 1]);
+            }
+            // If we have got here we are authenticated so remove the handler 
and pass
+            // the original message down the pipeline for processing
+            ctx.pipeline().remove(this);
+            final RequestMessage original = request.get();
+            ctx.fireChannelRead(original);
+
+            // Also send deferred requests if there are any down the pipeline 
for processing
+            if (deferredRequests.get() != null) {
+                
deferredRequests.getAndSet(null).getValue().forEach(ctx::fireChannelRead);
+            }
+        } catch (AuthenticationException ae) {
+            respondWithError(
+                    requestMessage,
+                    builder -> 
builder.statusMessage(ae.getMessage()).code(ResponseStatusCode.UNAUTHORIZED),
+                    ctx);
+        }
+    }
+
+    private void respondWithError(final RequestMessage requestMessage, final 
Function<ResponseMessage.Builder, ResponseMessage.Builder> buildResponse, final 
ChannelHandlerContext ctx) {
+        final Attribute<RequestMessage> originalRequest = 
ctx.channel().attr(StateKey.REQUEST_MESSAGE);
+        final Attribute<Pair<LocalDateTime, List<RequestMessage>>> 
deferredRequests = ctx.channel().attr(StateKey.DEFERRED_REQUEST_MESSAGES);
+
+        if (!requestMessage.getOp().equals(Tokens.OPS_AUTHENTICATION)) {
+            
ctx.write(buildResponse.apply(ResponseMessage.build(requestMessage)).create());
+        }
+
+        if (originalRequest.get() != null) {
+            
ctx.write(buildResponse.apply(ResponseMessage.build(originalRequest.get())).create());
+        }
+
+        if (deferredRequests.get() != null) {
+            deferredRequests
+                    .getAndSet(null).getValue().stream()
+                    .map(ResponseMessage::build)
+                    .map(buildResponse)
+                    .map(ResponseMessage.Builder::create)
+                    .forEach(ctx::write);
+        }
+
+        ctx.flush();
     }
 
-    private InetAddress getRemoteInetAddress(final ChannelHandlerContext ctx)
-    {
+    private InetAddress getRemoteInetAddress(final ChannelHandlerContext ctx) {
         final Channel channel = ctx.channel();
 
         if (null == channel)
@@ -172,9 +227,9 @@ public class SaslAuthenticationHandler extends 
AbstractAuthenticationHandler {
 
         final SocketAddress genericSocketAddr = channel.remoteAddress();
 
-        if (null == genericSocketAddr || !(genericSocketAddr instanceof 
InetSocketAddress))
+        if (!(genericSocketAddr instanceof InetSocketAddress))
             return null;
 
-        return ((InetSocketAddress)genericSocketAddr).getAddress();
+        return ((InetSocketAddress) genericSocketAddr).getAddress();
     }
 }
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
index cfb2e320ad..4f693942e1 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/StateKey.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.server.handler;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.tinkerpop.gremlin.util.MessageSerializer;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
 import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
@@ -25,6 +26,9 @@ import org.apache.tinkerpop.gremlin.server.auth.Authenticator;
 import org.apache.tinkerpop.gremlin.server.op.session.Session;
 import io.netty.util.AttributeKey;
 
+import java.time.LocalDateTime;
+import java.util.List;
+
 /**
  * Keys used in the various handlers to store state in the pipeline.
  *
@@ -59,6 +63,11 @@ public final class StateKey {
      */
     public static final AttributeKey<RequestMessage> REQUEST_MESSAGE = 
AttributeKey.valueOf("request");
 
+    /**
+     * The key for the deferred requests.
+     */
+    public static final AttributeKey<Pair<LocalDateTime, 
List<RequestMessage>>> DEFERRED_REQUEST_MESSAGES = 
AttributeKey.valueOf("deferredRequests");
+
     /**
      * The key for the current {@link AuthenticatedUser}.
      */
diff --git 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
index e39d8c2b44..01367ec49c 100644
--- 
a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
+++ 
b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerAuthIntegrateTest.java
@@ -18,23 +18,34 @@
  */
 package org.apache.tinkerpop.gremlin.server;
 
-import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
 import org.apache.tinkerpop.gremlin.driver.Client;
 import org.apache.tinkerpop.gremlin.driver.Cluster;
+import org.apache.tinkerpop.gremlin.driver.Result;
+import org.apache.tinkerpop.gremlin.driver.ResultSet;
 import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.driver.simple.WebSocketClient;
 import org.apache.tinkerpop.gremlin.server.auth.SimpleAuthenticator;
-import org.ietf.jgss.GSSException;
+import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler;
 import org.apache.tinkerpop.gremlin.structure.Property;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
+import org.apache.tinkerpop.gremlin.util.Tokens;
+import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseMessage;
+import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode;
+import org.apache.tinkerpop.gremlin.util.ser.Serializers;
+import org.ietf.jgss.GSSException;
 import org.junit.Test;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-
-import org.apache.tinkerpop.gremlin.util.ser.Serializers;
+import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.AnyOf.anyOf;
@@ -163,6 +174,68 @@ public class GremlinServerAuthIntegrateTest extends 
AbstractGremlinServerIntegra
         }
     }
 
+    @Test
+    public void 
shouldFailAuthenticateWithUnAuthenticatedRequestAfterMaxDeferrableDuration() 
throws Exception {
+        try (WebSocketClient client = 
TestClientFactory.createWebSocketClient()) {
+            // First request will initiate the authentication handshake
+            // Subsequent requests will be deferred
+            CompletableFuture<List<ResponseMessage>> 
futureOfRequestWithinAuthDuration1  = client.submitAsync("");
+            CompletableFuture<List<ResponseMessage>> 
futureOfRequestWithinAuthDuration2  = client.submitAsync("");
+            CompletableFuture<List<ResponseMessage>> 
futureOfRequestWithinAuthDuration3  = client.submitAsync("");
+
+            // After the maximum allowed deferred request duration,
+            // any non-authenticated request will invalidate all requests with 
429 error
+            CompletableFuture<List<ResponseMessage>> 
futureOfRequestSubmittedTooLate = CompletableFuture.runAsync(() -> {
+                try {
+                    
Thread.sleep(SaslAuthenticationHandler.MAX_REQUEST_DEFERRABLE_DURATION.plus(Duration.ofSeconds(1)).toMillis());
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }).thenCompose((__) -> {
+                try {
+                    return client.submitAsync("");
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            assertEquals(2, futureOfRequestWithinAuthDuration1.get().size());
+            assertEquals(1, futureOfRequestWithinAuthDuration2.get().size());
+            assertEquals(1, futureOfRequestWithinAuthDuration3.get().size());
+            assertEquals(1, futureOfRequestSubmittedTooLate.get().size());
+
+            assertEquals(ResponseStatusCode.AUTHENTICATE, 
futureOfRequestWithinAuthDuration1.get().get(0).getStatus().getCode());
+            assertEquals(ResponseStatusCode.UNAUTHORIZED, 
futureOfRequestWithinAuthDuration1.get().get(1).getStatus().getCode());
+            assertEquals(ResponseStatusCode.UNAUTHORIZED, 
futureOfRequestWithinAuthDuration2.get().get(0).getStatus().getCode());
+            assertEquals(ResponseStatusCode.UNAUTHORIZED, 
futureOfRequestWithinAuthDuration3.get().get(0).getStatus().getCode());
+            assertEquals(ResponseStatusCode.UNAUTHORIZED, 
futureOfRequestSubmittedTooLate.get().get(0).getStatus().getCode());
+        }
+    }
+
+    @Test
+    public void shouldFailAuthenticateWithIncorrectParallelRequests() throws 
Exception {
+        try (WebSocketClient client = 
TestClientFactory.createWebSocketClient()) {
+
+            CompletableFuture<List<ResponseMessage>> firstRequest = 
client.submitAsync("1");
+            CompletableFuture<List<ResponseMessage>> secondRequest  = 
client.submitAsync("2");
+            CompletableFuture<List<ResponseMessage>> thirdRequest  = 
client.submitAsync("3");
+
+            Thread.sleep(500);
+
+            // send some incorrect value for username password which should 
cause all requests to fail.
+            
client.submitAsync(RequestMessage.build(Tokens.OPS_AUTHENTICATION).addArg(Tokens.ARGS_SASL,
 "someincorrectvalue").create());
+
+            assertEquals(2, firstRequest.get().size());
+            assertEquals(1, secondRequest.get().size());
+            assertEquals(1, thirdRequest.get().size());
+
+            assertEquals(ResponseStatusCode.AUTHENTICATE, 
firstRequest.get().get(0).getStatus().getCode());
+            assertEquals(ResponseStatusCode.UNAUTHORIZED, 
firstRequest.get().get(1).getStatus().getCode());
+            assertEquals(ResponseStatusCode.UNAUTHORIZED, 
secondRequest.get().get(0).getStatus().getCode());
+            assertEquals(ResponseStatusCode.UNAUTHORIZED, 
thirdRequest.get().get(0).getStatus().getCode());
+        }
+    }
+
     @Test
     public void shouldAuthenticateWithPlainTextOverDefaultJSONSerialization() 
throws Exception {
         final Cluster cluster = 
TestClientFactory.build().serializer(Serializers.GRAPHSON)

Reply via email to