[ 
https://issues.apache.org/jira/browse/TINKERPOP-2132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16745708#comment-16745708
 ] 

kaiyangzhang edited comment on TINKERPOP-2132 at 1/18/19 6:24 AM:
------------------------------------------------------------------

[~spmallette] Thank you for your reply.

I think of a temporary solution,but not a good way:

In a concurrent scenario, when _"SaslAuthenticationHandler"_ receive the first 
message,starting authentication. Ignore other messages during the 
authentication process.
{code:java}
public class JanusGraphWebSocketChannelizer extends WebSocketChannelizer
{
    private static final String CHCECK_AUTHENTICATOR = "check-authenticator";

    @Override
    public void init(final ServerGremlinExecutor serverGremlinExecutor) {
        super.init(serverGremlinExecutor);
    }

    @Override
    public void configure(ChannelPipeline pipeline)
    {
        super.configure(pipeline);
       
        
if(pipeline.names().contains(AbstractChannelizer.PIPELINE_AUTHENTICATOR))
        {
           pipeline.addBefore(PIPELINE_AUTHENTICATOR, CHCECK_AUTHENTICATOR, new 
JanusgraphCheckAuthenticationHandler());
        }
    }
}
{code}
 
{code:java}
public class JanusgraphCheckAuthenticationHandler extends 
ChannelInboundHandlerAdapter
{

private static final Logger LOGGER = 
LoggerFactory.getLogger(JanusgraphCheckAuthenticationHandler.class);

private volatile AtomicBoolean authentication;

public JanusgraphCheckAuthenticationHandler()
{
    authentication = new AtomicBoolean(false);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception
{
/** Handle only ordinary gremlin requests*/
if (msg instanceof RequestMessage && 
!((RequestMessage)msg).getOp().equals(Tokens.OPS_AUTHENTICATION))
{
/** During the certification process, Ignore other Gremlin requests*/
if(authentication.get())
{
channelReadInAuthenticationProcess(ctx, msg);
}
else
{
synchronized (this)
{
if(authentication.get())
{
channelReadInAuthenticationProcess(ctx, msg);
}
else
{
/** Receive gemlin request for the first time, start process authentication */
authentication.compareAndSet(false, true);
ctx.fireChannelRead(msg);
}
}
}
}
else
{
ctx.fireChannelRead(msg);
}
}


private void channelReadInAuthenticationProcess(final ChannelHandlerContext 
ctx, final Object msg)
{
if(!ctx.pipeline().names().contains(AbstractChannelizer.PIPELINE_AUTHENTICATOR))
{
LOGGER.debug("-------------remove JanusgraphCheckAuthenticationHandler 
start-----------");
ctx.pipeline().remove(this);
LOGGER.debug("-------------remove JanusgraphCheckAuthenticationHandler 
end-----------");
ctx.fireChannelRead(msg);
}
else
{
RequestMessage oldMessage = (RequestMessage)msg;
RequestMessage messageReplicas = RequestMessage.from(oldMessage)
.addArg("ACTION", "IGNORE_AUTHENTICATION")
.create();
LOGGER.debug("----------------- ignore authentication--------------");
ctx.fireChannelRead(messageReplicas);
}
}
}{code}
 

 

 
{code:java}
public class JanusgraphSaslAuthenticationHandler extends 
AbstractAuthenticationHandler
{
    private static final Logger logger = 
LoggerFactory.getLogger(SaslAuthenticationHandler.class);
    private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
    private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
    private static final Logger auditLogger = 
LoggerFactory.getLogger("audit.org.apache.tinkerpop.gremlin.server");
    protected final Settings.AuthenticationSettings authenticationSettings;


    public JanusgraphSaslAuthenticationHandler(Authenticator authenticator, 
Settings.AuthenticationSettings authenticationSettings) {
        super(authenticator);
        this.authenticationSettings = authenticationSettings;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
        if(msg instanceof RequestMessage) {
            RequestMessage requestMessage = (RequestMessage)msg;
            Attribute negotiator = ctx.attr(StateKey.NEGOTIATOR);
            Attribute request = ctx.attr(StateKey.REQUEST_MESSAGE);
            ResponseMessage error;
            if(requestMessage.getArgs().containsKey("ACTION")){
                ctx.fireChannelRead(msg);
            }else if(negotiator.get() == null) {
                
negotiator.set(this.authenticator.newSaslNegotiator(this.getRemoteInetAddress(ctx)));
                request.set(requestMessage);
                error = 
ResponseMessage.build(requestMessage).code(ResponseStatusCode.AUTHENTICATE).create();
                ctx.writeAndFlush(error);
            }else if(requestMessage.getOp().equals("authentication") && 
requestMessage.getArgs().containsKey("sasl")) {
                Object error2 = requestMessage.getArgs().get("sasl");
                if(!(error2 instanceof String)) {
   ...........
   ...........
   ...........{code}
 

 


was (Author: kaiyangzhang):
[~spmallette] Thank you for your reply.

I think of a temporary solution,but not a good way:

In a concurrent scenario, when _"SaslAuthenticationHandler"_ receive the first 
message,starting authentication. Ignore other messages during the 
authentication process.
{code:java}
public class JanusGraphWebSocketChannelizer extends WebSocketChannelizer
{
    private static final String CHCECK_AUTHENTICATOR = "check-authenticator";

    @Override
    public void init(final ServerGremlinExecutor serverGremlinExecutor) {
        super.init(serverGremlinExecutor);
    }

    @Override
    public void configure(ChannelPipeline pipeline)
    {
        super.configure(pipeline);
       
        
if(pipeline.names().contains(AbstractChannelizer.PIPELINE_AUTHENTICATOR))
        {
           *pipeline.addBefore(PIPELINE_AUTHENTICATOR, CHCECK_AUTHENTICATOR, 
new JanusgraphCheckAuthenticationHandler());*
        }
    }
}
{code}
 
{code:java}
public class JanusgraphCheckAuthenticationHandler extends 
ChannelInboundHandlerAdapter
{

private static final Logger LOGGER = 
LoggerFactory.getLogger(JanusgraphCheckAuthenticationHandler.class);

private volatile AtomicBoolean authentication;

public JanusgraphCheckAuthenticationHandler()
{
    authentication = new AtomicBoolean(false);
}

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception
{
/** Handle only ordinary gremlin requests*/
if (msg instanceof RequestMessage && 
!((RequestMessage)msg).getOp().equals(Tokens.OPS_AUTHENTICATION))
{
/** During the certification process, Ignore other Gremlin requests*/
if(authentication.get())
{
channelReadInAuthenticationProcess(ctx, msg);
}
else
{
synchronized (this)
{
if(authentication.get())
{
channelReadInAuthenticationProcess(ctx, msg);
}
else
{
/** Receive gemlin request for the first time, start process authentication */
authentication.compareAndSet(false, true);
ctx.fireChannelRead(msg);
}
}
}
}
else
{
ctx.fireChannelRead(msg);
}
}


private void channelReadInAuthenticationProcess(final ChannelHandlerContext 
ctx, final Object msg)
{
if(!ctx.pipeline().names().contains(AbstractChannelizer.PIPELINE_AUTHENTICATOR))
{
LOGGER.debug("-------------remove JanusgraphCheckAuthenticationHandler 
start-----------");
ctx.pipeline().remove(this);
LOGGER.debug("-------------remove JanusgraphCheckAuthenticationHandler 
end-----------");
ctx.fireChannelRead(msg);
}
else
{
RequestMessage oldMessage = (RequestMessage)msg;
RequestMessage messageReplicas = RequestMessage.from(oldMessage)
.addArg("ACTION", "IGNORE_AUTHENTICATION")
.create();
LOGGER.debug("----------------- ignore authentication--------------");
ctx.fireChannelRead(messageReplicas);
}
}
}{code}
 

 

 
{code:java}
public class JanusgraphSaslAuthenticationHandler extends 
AbstractAuthenticationHandler
{
    private static final Logger logger = 
LoggerFactory.getLogger(SaslAuthenticationHandler.class);
    private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
    private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
    private static final Logger auditLogger = 
LoggerFactory.getLogger("audit.org.apache.tinkerpop.gremlin.server");
    protected final Settings.AuthenticationSettings authenticationSettings;


    public JanusgraphSaslAuthenticationHandler(Authenticator authenticator, 
Settings.AuthenticationSettings authenticationSettings) {
        super(authenticator);
        this.authenticationSettings = authenticationSettings;
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
        if(msg instanceof RequestMessage) {
            RequestMessage requestMessage = (RequestMessage)msg;
            Attribute negotiator = ctx.attr(StateKey.NEGOTIATOR);
            Attribute request = ctx.attr(StateKey.REQUEST_MESSAGE);
            ResponseMessage error;
            if(requestMessage.getArgs().containsKey("ACTION")){
                ctx.fireChannelRead(msg);
            }else if(negotiator.get() == null) {
                
negotiator.set(this.authenticator.newSaslNegotiator(this.getRemoteInetAddress(ctx)));
                request.set(requestMessage);
                error = 
ResponseMessage.build(requestMessage).code(ResponseStatusCode.AUTHENTICATE).create();
                ctx.writeAndFlush(error);
            }else if(requestMessage.getOp().equals("authentication") && 
requestMessage.getArgs().containsKey("sasl")) {
                Object error2 = requestMessage.getArgs().get("sasl");
                if(!(error2 instanceof String)) {
   ...........
   ...........
   ...........{code}
 

 

> Authentication when using multiple threads fails
> ------------------------------------------------
>
>                 Key: TINKERPOP-2132
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-2132
>             Project: TinkerPop
>          Issue Type: Bug
>          Components: driver
>    Affects Versions: 3.3.2
>            Reporter: kaiyangzhang
>            Assignee: stephen mallette
>            Priority: Major
>
> *Scenes:*
>    1. Gremlin Server  Kerberos Authentication
>    2. Multithreading using the same client
>  
> {code:java}
>        DriverRemoteConnection connection = 
> DriverRemoteConnection.using(cluster,"graphbase");
>         GraphTraversalSource g = graph.traversal().withRemote(connection);
>       Thread demo1 = new Thread(new ThreadDemo1(g));
>        Thread demo2 = new Thread(new ThreadDemo1(g));
>        Thread demo3 = new Thread(new ThreadDemo1(g));
>        Thread demo4 = new Thread(new ThreadDemo1(g));
>        Thread demo5 = new Thread(new ThreadDemo1(g));
>       Thread demo6 = new Thread(new ThreadDemo1(g));
>        Thread demo7 = new Thread(new ThreadDemo1(g)); 
>        Thread demo8 = new Thread(new ThreadDemo1(g));
>        Thread demo9 = new Thread(new ThreadDemo1(g));
>        Thread demo10 = new Thread(new ThreadDemo1(g));
> {code}
>  
> *ERROR INFO*
> {code:java}
> Exception in thread "Thread-4" java.util.concurrent.CompletionException: 
> org.apache.tinkerpop.gremlin.driver.exception.ResponseException: Failed to 
> authenticate
>  at 
> java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
>  at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934)
>  at org.apache.tinkerpop.gremlin.driver.ResultSet.one(ResultSet.java:107)
>  at 
> org.apache.tinkerpop.gremlin.driver.ResultSet$1.hasNext(ResultSet.java:159)
>  at org.apache.tinkerpop.gremlin.driver.ResultSet$1.next(ResultSet.java:166)
>  at org.apache.tinkerpop.gremlin.driver.ResultSet$1.next(ResultSet.java:153)
>  at 
> org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteTraversal$TraverserIterator.next(DriverRemoteTraversal.java:142)
>  at 
> org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteTraversal$TraverserIterator.next(DriverRemoteTraversal.java:127)
>  at 
> org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteTraversal.nextTraverser(DriverRemoteTraversal.java:108)
>  at 
> org.apache.tinkerpop.gremlin.process.remote.traversal.step.map.RemoteStep.processNextStart(RemoteStep.java:80)
>  at 
> org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143)
>  at 
> org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal.hasNext(DefaultTraversal.java:192)
>  at com.huawei.graphbase.gremlin.ThreadDemo1.println(ThreadDemo1.java:48)
>  at com.huawei.graphbase.gremlin.ThreadDemo1.run(ThreadDemo1.java:32)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: 
> Failed to authenticate
>  at 
> org.apache.tinkerpop.gremlin.driver.Handler$GremlinResponseHandler.channelRead0(Handler.java:246)
>  at 
> org.apache.tinkerpop.gremlin.driver.Handler$GremlinResponseHandler.channelRead0(Handler.java:197)
>  at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
>  at 
> org.apache.tinkerpop.gremlin.driver.Handler$GremlinSaslAuthenticationHandler.channelRead0(Handler.java:123)
>  at 
> org.apache.tinkerpop.gremlin.driver.Handler$GremlinSaslAuthenticationHandler.channelRead0(Handler.java:67)
>  at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
>  at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
>  at 
> org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:94)
>  at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
>  at 
> io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
>  at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)v[2072680]
>  at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
>  at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
>  at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
>  at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
>  at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
>  at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
>  at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
>  at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
>  at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
>  ... 1 more{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to