Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2704#discussion_r193434753
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
 ---
    @@ -41,80 +38,88 @@ public SaslStormClientHandler(ISaslClient client) 
throws IOException {
         }
     
         @Override
    -    public void channelConnected(ChannelHandlerContext ctx,
    -                                 ChannelStateEvent event) {
    -        // register the newly established channel
    -        Channel channel = ctx.getChannel();
    -        client.channelConnected(channel);
    +    public void channelActive(ChannelHandlerContext ctx) {
    +        Channel channel = ctx.channel();
     
    +        LOG.info("Connection established from " + channel.localAddress()
    +            + " to " + channel.remoteAddress());
             try {
    -            SaslNettyClient saslNettyClient = 
SaslNettyClientState.getSaslNettyClient
    -                .get(channel);
    +            SaslNettyClient saslNettyClient = 
channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
     
                 if (saslNettyClient == null) {
                     LOG.debug("Creating saslNettyClient now " + "for channel: "
                               + channel);
                     saslNettyClient = new SaslNettyClient(name, token);
    -                SaslNettyClientState.getSaslNettyClient.set(channel,
    -                                                            
saslNettyClient);
    +                
channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).set(saslNettyClient);
                 }
                 LOG.debug("Sending SASL_TOKEN_MESSAGE_REQUEST");
    -            channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
    +            
channel.writeAndFlush(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST, 
channel.voidPromise());
             } catch (Exception e) {
                 LOG.error("Failed to authenticate with server " + "due to 
error: ",
                           e);
             }
         }
     
         @Override
    -    public void messageReceived(ChannelHandlerContext ctx, MessageEvent 
event)
    -        throws Exception {
    +    public void channelRead(ChannelHandlerContext ctx, Object message) 
throws Exception {
             LOG.debug("send/recv time (ms): {}",
                       (System.currentTimeMillis() - start_time));
     
    -        Channel channel = ctx.getChannel();
    -
    +        // examine the response message from server
    +        if (message instanceof ControlMessage) {
    +            handleControlMessage(ctx, (ControlMessage) message);
    +        } else if (message instanceof SaslMessageToken) {
    +            handleSaslMessageToken(ctx, (SaslMessageToken) message);
    +        } else {
    +            LOG.error("Unexpected message from server: {}", message);
    +        }
    +    }
    +    
    +    private SaslNettyClient getChannelSaslNettyClient(Channel channel) 
throws Exception {
             // Generate SASL response to server using Channel-local SASL 
client.
    -        SaslNettyClient saslNettyClient = 
SaslNettyClientState.getSaslNettyClient
    -            .get(channel);
    +        SaslNettyClient saslNettyClient = 
channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
             if (saslNettyClient == null) {
                 throw new Exception("saslNettyClient was unexpectedly "
    -                                + "null for channel: " + channel);
    +                + "null for channel: " + channel);
             }
    +        return saslNettyClient;
    +    }
    +    
    +    private void handleControlMessage(ChannelHandlerContext ctx, 
ControlMessage controlMessage) throws Exception {
    +        SaslNettyClient saslNettyClient = 
getChannelSaslNettyClient(ctx.channel());
    +        if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
    +            LOG.debug("Server has sent us the SaslComplete "
    +                + "message. Allowing normal work to proceed.");
     
    -        // examine the response message from server
    -        if (event.getMessage() instanceof ControlMessage) {
    -            ControlMessage msg = (ControlMessage) event.getMessage();
    -            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
    -                LOG.debug("Server has sent us the SaslComplete "
    -                          + "message. Allowing normal work to proceed.");
    -
    -                if (!saslNettyClient.isComplete()) {
    -                    LOG.error("Server returned a Sasl-complete message, "
    -                              + "but as far as we can tell, we are not 
authenticated yet.");
    -                    throw new Exception("Server returned a "
    -                                        + "Sasl-complete message, but as 
far as "
    -                                        + "we can tell, we are not 
authenticated yet.");
    -                }
    -                ctx.getPipeline().remove(this);
    -                this.client.channelReady();
    -
    -                // We call fireMessageReceived since the client is allowed 
to
    -                // perform this request. The client's request will now 
proceed
    -                // to the next pipeline component namely 
StormClientHandler.
    -                Channels.fireMessageReceived(ctx, msg);
    -                return;
    +            if (!saslNettyClient.isComplete()) {
    +                LOG.error("Server returned a Sasl-complete message, "
    +                    + "but as far as we can tell, we are not authenticated 
yet.");
    +                throw new Exception("Server returned a "
    +                    + "Sasl-complete message, but as far as "
    +                    + "we can tell, we are not authenticated yet.");
                 }
    +            ctx.pipeline().remove(this);
    +            this.client.channelReady(ctx.channel());
    +
    +            // We call fireMessageRead since the client is allowed to
    +            // perform this request. The client's request will now proceed
    +            // to the next pipeline component namely StormClientHandler.
    +            ctx.fireChannelRead(controlMessage);
    +        } else {
    +            LOG.warn("Unexpected control message: {}", controlMessage);
             }
    -        SaslMessageToken saslTokenMessage = (SaslMessageToken) event
    -            .getMessage();
    +    }
    +    
    +    private void handleSaslMessageToken(ChannelHandlerContext ctx, 
SaslMessageToken saslMessageToken) throws Exception {
    +        Channel channel = ctx.channel();
    +        SaslNettyClient saslNettyClient = 
getChannelSaslNettyClient(channel);
             LOG.debug("Responding to server's token of length: "
    -                  + saslTokenMessage.getSaslToken().length);
    +                  + saslMessageToken.getSaslToken().length);
    --- End diff --
    
    nit: indent


---

Reply via email to