Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2704#discussion_r193434753 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java --- @@ -41,80 +38,88 @@ public SaslStormClientHandler(ISaslClient client) throws IOException { } @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent event) { - // register the newly established channel - Channel channel = ctx.getChannel(); - client.channelConnected(channel); + public void channelActive(ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + LOG.info("Connection established from " + channel.localAddress() + + " to " + channel.remoteAddress()); try { - SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient - .get(channel); + SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get(); if (saslNettyClient == null) { LOG.debug("Creating saslNettyClient now " + "for channel: " + channel); saslNettyClient = new SaslNettyClient(name, token); - SaslNettyClientState.getSaslNettyClient.set(channel, - saslNettyClient); + channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).set(saslNettyClient); } LOG.debug("Sending SASL_TOKEN_MESSAGE_REQUEST"); - channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST); + channel.writeAndFlush(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST, channel.voidPromise()); } catch (Exception e) { LOG.error("Failed to authenticate with server " + "due to error: ", e); } } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) - throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time)); - Channel channel = ctx.getChannel(); - + // examine the response message from server + if (message instanceof ControlMessage) { + handleControlMessage(ctx, (ControlMessage) message); + } else if (message instanceof SaslMessageToken) { + handleSaslMessageToken(ctx, (SaslMessageToken) message); + } else { + LOG.error("Unexpected message from server: {}", message); + } + } + + private SaslNettyClient getChannelSaslNettyClient(Channel channel) throws Exception { // Generate SASL response to server using Channel-local SASL client. - SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient - .get(channel); + SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get(); if (saslNettyClient == null) { throw new Exception("saslNettyClient was unexpectedly " - + "null for channel: " + channel); + + "null for channel: " + channel); } + return saslNettyClient; + } + + private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage controlMessage) throws Exception { + SaslNettyClient saslNettyClient = getChannelSaslNettyClient(ctx.channel()); + if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) { + LOG.debug("Server has sent us the SaslComplete " + + "message. Allowing normal work to proceed."); - // examine the response message from server - if (event.getMessage() instanceof ControlMessage) { - ControlMessage msg = (ControlMessage) event.getMessage(); - if (msg == ControlMessage.SASL_COMPLETE_REQUEST) { - LOG.debug("Server has sent us the SaslComplete " - + "message. Allowing normal work to proceed."); - - if (!saslNettyClient.isComplete()) { - LOG.error("Server returned a Sasl-complete message, " - + "but as far as we can tell, we are not authenticated yet."); - throw new Exception("Server returned a " - + "Sasl-complete message, but as far as " - + "we can tell, we are not authenticated yet."); - } - ctx.getPipeline().remove(this); - this.client.channelReady(); - - // We call fireMessageReceived since the client is allowed to - // perform this request. The client's request will now proceed - // to the next pipeline component namely StormClientHandler. - Channels.fireMessageReceived(ctx, msg); - return; + if (!saslNettyClient.isComplete()) { + LOG.error("Server returned a Sasl-complete message, " + + "but as far as we can tell, we are not authenticated yet."); + throw new Exception("Server returned a " + + "Sasl-complete message, but as far as " + + "we can tell, we are not authenticated yet."); } + ctx.pipeline().remove(this); + this.client.channelReady(ctx.channel()); + + // We call fireMessageRead since the client is allowed to + // perform this request. The client's request will now proceed + // to the next pipeline component namely StormClientHandler. + ctx.fireChannelRead(controlMessage); + } else { + LOG.warn("Unexpected control message: {}", controlMessage); } - SaslMessageToken saslTokenMessage = (SaslMessageToken) event - .getMessage(); + } + + private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception { + Channel channel = ctx.channel(); + SaslNettyClient saslNettyClient = getChannelSaslNettyClient(channel); LOG.debug("Responding to server's token of length: " - + saslTokenMessage.getSaslToken().length); + + saslMessageToken.getSaslToken().length); --- End diff -- nit: indent
---