[ https://issues.apache.org/jira/browse/TINKERPOP-2132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16745708#comment-16745708 ]
kaiyangzhang edited comment on TINKERPOP-2132 at 1/18/19 6:24 AM: ------------------------------------------------------------------ [~spmallette] Thank you for your reply. I think of a temporary solution,but not a good way: In a concurrent scenario, when _"SaslAuthenticationHandler"_ receive the first message,starting authentication. Ignore other messages during the authentication process. {code:java} public class JanusGraphWebSocketChannelizer extends WebSocketChannelizer { private static final String CHCECK_AUTHENTICATOR = "check-authenticator"; @Override public void init(final ServerGremlinExecutor serverGremlinExecutor) { super.init(serverGremlinExecutor); } @Override public void configure(ChannelPipeline pipeline) { super.configure(pipeline); if(pipeline.names().contains(AbstractChannelizer.PIPELINE_AUTHENTICATOR)) { pipeline.addBefore(PIPELINE_AUTHENTICATOR, CHCECK_AUTHENTICATOR, new JanusgraphCheckAuthenticationHandler()); } } } {code} {code:java} public class JanusgraphCheckAuthenticationHandler extends ChannelInboundHandlerAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(JanusgraphCheckAuthenticationHandler.class); private volatile AtomicBoolean authentication; public JanusgraphCheckAuthenticationHandler() { authentication = new AtomicBoolean(false); } @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { /** Handle only ordinary gremlin requests*/ if (msg instanceof RequestMessage && !((RequestMessage)msg).getOp().equals(Tokens.OPS_AUTHENTICATION)) { /** During the certification process, Ignore other Gremlin requests*/ if(authentication.get()) { channelReadInAuthenticationProcess(ctx, msg); } else { synchronized (this) { if(authentication.get()) { channelReadInAuthenticationProcess(ctx, msg); } else { /** Receive gemlin request for the first time, start process authentication */ authentication.compareAndSet(false, true); ctx.fireChannelRead(msg); } } } } else { ctx.fireChannelRead(msg); } } private void channelReadInAuthenticationProcess(final ChannelHandlerContext ctx, final Object msg) { if(!ctx.pipeline().names().contains(AbstractChannelizer.PIPELINE_AUTHENTICATOR)) { LOGGER.debug("-------------remove JanusgraphCheckAuthenticationHandler start-----------"); ctx.pipeline().remove(this); LOGGER.debug("-------------remove JanusgraphCheckAuthenticationHandler end-----------"); ctx.fireChannelRead(msg); } else { RequestMessage oldMessage = (RequestMessage)msg; RequestMessage messageReplicas = RequestMessage.from(oldMessage) .addArg("ACTION", "IGNORE_AUTHENTICATION") .create(); LOGGER.debug("----------------- ignore authentication--------------"); ctx.fireChannelRead(messageReplicas); } } }{code} {code:java} public class JanusgraphSaslAuthenticationHandler extends AbstractAuthenticationHandler { private static final Logger logger = LoggerFactory.getLogger(SaslAuthenticationHandler.class); private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder(); private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder(); private static final Logger auditLogger = LoggerFactory.getLogger("audit.org.apache.tinkerpop.gremlin.server"); protected final Settings.AuthenticationSettings authenticationSettings; public JanusgraphSaslAuthenticationHandler(Authenticator authenticator, Settings.AuthenticationSettings authenticationSettings) { super(authenticator); this.authenticationSettings = authenticationSettings; } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof RequestMessage) { RequestMessage requestMessage = (RequestMessage)msg; Attribute negotiator = ctx.attr(StateKey.NEGOTIATOR); Attribute request = ctx.attr(StateKey.REQUEST_MESSAGE); ResponseMessage error; if(requestMessage.getArgs().containsKey("ACTION")){ ctx.fireChannelRead(msg); }else if(negotiator.get() == null) { negotiator.set(this.authenticator.newSaslNegotiator(this.getRemoteInetAddress(ctx))); request.set(requestMessage); error = ResponseMessage.build(requestMessage).code(ResponseStatusCode.AUTHENTICATE).create(); ctx.writeAndFlush(error); }else if(requestMessage.getOp().equals("authentication") && requestMessage.getArgs().containsKey("sasl")) { Object error2 = requestMessage.getArgs().get("sasl"); if(!(error2 instanceof String)) { ........... ........... ...........{code} was (Author: kaiyangzhang): [~spmallette] Thank you for your reply. I think of a temporary solution,but not a good way: In a concurrent scenario, when _"SaslAuthenticationHandler"_ receive the first message,starting authentication. Ignore other messages during the authentication process. {code:java} public class JanusGraphWebSocketChannelizer extends WebSocketChannelizer { private static final String CHCECK_AUTHENTICATOR = "check-authenticator"; @Override public void init(final ServerGremlinExecutor serverGremlinExecutor) { super.init(serverGremlinExecutor); } @Override public void configure(ChannelPipeline pipeline) { super.configure(pipeline); if(pipeline.names().contains(AbstractChannelizer.PIPELINE_AUTHENTICATOR)) { *pipeline.addBefore(PIPELINE_AUTHENTICATOR, CHCECK_AUTHENTICATOR, new JanusgraphCheckAuthenticationHandler());* } } } {code} {code:java} public class JanusgraphCheckAuthenticationHandler extends ChannelInboundHandlerAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(JanusgraphCheckAuthenticationHandler.class); private volatile AtomicBoolean authentication; public JanusgraphCheckAuthenticationHandler() { authentication = new AtomicBoolean(false); } @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { /** Handle only ordinary gremlin requests*/ if (msg instanceof RequestMessage && !((RequestMessage)msg).getOp().equals(Tokens.OPS_AUTHENTICATION)) { /** During the certification process, Ignore other Gremlin requests*/ if(authentication.get()) { channelReadInAuthenticationProcess(ctx, msg); } else { synchronized (this) { if(authentication.get()) { channelReadInAuthenticationProcess(ctx, msg); } else { /** Receive gemlin request for the first time, start process authentication */ authentication.compareAndSet(false, true); ctx.fireChannelRead(msg); } } } } else { ctx.fireChannelRead(msg); } } private void channelReadInAuthenticationProcess(final ChannelHandlerContext ctx, final Object msg) { if(!ctx.pipeline().names().contains(AbstractChannelizer.PIPELINE_AUTHENTICATOR)) { LOGGER.debug("-------------remove JanusgraphCheckAuthenticationHandler start-----------"); ctx.pipeline().remove(this); LOGGER.debug("-------------remove JanusgraphCheckAuthenticationHandler end-----------"); ctx.fireChannelRead(msg); } else { RequestMessage oldMessage = (RequestMessage)msg; RequestMessage messageReplicas = RequestMessage.from(oldMessage) .addArg("ACTION", "IGNORE_AUTHENTICATION") .create(); LOGGER.debug("----------------- ignore authentication--------------"); ctx.fireChannelRead(messageReplicas); } } }{code} {code:java} public class JanusgraphSaslAuthenticationHandler extends AbstractAuthenticationHandler { private static final Logger logger = LoggerFactory.getLogger(SaslAuthenticationHandler.class); private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder(); private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder(); private static final Logger auditLogger = LoggerFactory.getLogger("audit.org.apache.tinkerpop.gremlin.server"); protected final Settings.AuthenticationSettings authenticationSettings; public JanusgraphSaslAuthenticationHandler(Authenticator authenticator, Settings.AuthenticationSettings authenticationSettings) { super(authenticator); this.authenticationSettings = authenticationSettings; } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof RequestMessage) { RequestMessage requestMessage = (RequestMessage)msg; Attribute negotiator = ctx.attr(StateKey.NEGOTIATOR); Attribute request = ctx.attr(StateKey.REQUEST_MESSAGE); ResponseMessage error; if(requestMessage.getArgs().containsKey("ACTION")){ ctx.fireChannelRead(msg); }else if(negotiator.get() == null) { negotiator.set(this.authenticator.newSaslNegotiator(this.getRemoteInetAddress(ctx))); request.set(requestMessage); error = ResponseMessage.build(requestMessage).code(ResponseStatusCode.AUTHENTICATE).create(); ctx.writeAndFlush(error); }else if(requestMessage.getOp().equals("authentication") && requestMessage.getArgs().containsKey("sasl")) { Object error2 = requestMessage.getArgs().get("sasl"); if(!(error2 instanceof String)) { ........... ........... ...........{code} > Authentication when using multiple threads fails > ------------------------------------------------ > > Key: TINKERPOP-2132 > URL: https://issues.apache.org/jira/browse/TINKERPOP-2132 > Project: TinkerPop > Issue Type: Bug > Components: driver > Affects Versions: 3.3.2 > Reporter: kaiyangzhang > Assignee: stephen mallette > Priority: Major > > *Scenes:* > 1. Gremlin Server Kerberos Authentication > 2. Multithreading using the same client > > {code:java} > DriverRemoteConnection connection = > DriverRemoteConnection.using(cluster,"graphbase"); > GraphTraversalSource g = graph.traversal().withRemote(connection); > Thread demo1 = new Thread(new ThreadDemo1(g)); > Thread demo2 = new Thread(new ThreadDemo1(g)); > Thread demo3 = new Thread(new ThreadDemo1(g)); > Thread demo4 = new Thread(new ThreadDemo1(g)); > Thread demo5 = new Thread(new ThreadDemo1(g)); > Thread demo6 = new Thread(new ThreadDemo1(g)); > Thread demo7 = new Thread(new ThreadDemo1(g)); > Thread demo8 = new Thread(new ThreadDemo1(g)); > Thread demo9 = new Thread(new ThreadDemo1(g)); > Thread demo10 = new Thread(new ThreadDemo1(g)); > {code} > > *ERROR INFO* > {code:java} > Exception in thread "Thread-4" java.util.concurrent.CompletionException: > org.apache.tinkerpop.gremlin.driver.exception.ResponseException: Failed to > authenticate > at > java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375) > at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1934) > at org.apache.tinkerpop.gremlin.driver.ResultSet.one(ResultSet.java:107) > at > org.apache.tinkerpop.gremlin.driver.ResultSet$1.hasNext(ResultSet.java:159) > at org.apache.tinkerpop.gremlin.driver.ResultSet$1.next(ResultSet.java:166) > at org.apache.tinkerpop.gremlin.driver.ResultSet$1.next(ResultSet.java:153) > at > org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteTraversal$TraverserIterator.next(DriverRemoteTraversal.java:142) > at > org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteTraversal$TraverserIterator.next(DriverRemoteTraversal.java:127) > at > org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteTraversal.nextTraverser(DriverRemoteTraversal.java:108) > at > org.apache.tinkerpop.gremlin.process.remote.traversal.step.map.RemoteStep.processNextStart(RemoteStep.java:80) > at > org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep.hasNext(AbstractStep.java:143) > at > org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal.hasNext(DefaultTraversal.java:192) > at com.huawei.graphbase.gremlin.ThreadDemo1.println(ThreadDemo1.java:48) > at com.huawei.graphbase.gremlin.ThreadDemo1.run(ThreadDemo1.java:32) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: > Failed to authenticate > at > org.apache.tinkerpop.gremlin.driver.Handler$GremlinResponseHandler.channelRead0(Handler.java:246) > at > org.apache.tinkerpop.gremlin.driver.Handler$GremlinResponseHandler.channelRead0(Handler.java:197) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) > at > org.apache.tinkerpop.gremlin.driver.Handler$GremlinSaslAuthenticationHandler.channelRead0(Handler.java:123) > at > org.apache.tinkerpop.gremlin.driver.Handler$GremlinSaslAuthenticationHandler.channelRead0(Handler.java:67) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) > at > org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler.channelRead0(WebSocketClientHandler.java:94) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) > at > io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)v[2072680] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) > ... 1 more{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)