Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2704#discussion_r193434753
--- Diff:
storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
---
@@ -41,80 +38,88 @@ public SaslStormClientHandler(ISaslClient client)
throws IOException {
}
@Override
- public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent event) {
- // register the newly established channel
- Channel channel = ctx.getChannel();
- client.channelConnected(channel);
+ public void channelActive(ChannelHandlerContext ctx) {
+ Channel channel = ctx.channel();
+ LOG.info("Connection established from " + channel.localAddress()
+ + " to " + channel.remoteAddress());
try {
- SaslNettyClient saslNettyClient =
SaslNettyClientState.getSaslNettyClient
- .get(channel);
+ SaslNettyClient saslNettyClient =
channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
if (saslNettyClient == null) {
LOG.debug("Creating saslNettyClient now " + "for channel: "
+ channel);
saslNettyClient = new SaslNettyClient(name, token);
- SaslNettyClientState.getSaslNettyClient.set(channel,
-
saslNettyClient);
+
channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).set(saslNettyClient);
}
LOG.debug("Sending SASL_TOKEN_MESSAGE_REQUEST");
- channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
+
channel.writeAndFlush(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST,
channel.voidPromise());
} catch (Exception e) {
LOG.error("Failed to authenticate with server " + "due to
error: ",
e);
}
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent
event)
- throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object message)
throws Exception {
LOG.debug("send/recv time (ms): {}",
(System.currentTimeMillis() - start_time));
- Channel channel = ctx.getChannel();
-
+ // examine the response message from server
+ if (message instanceof ControlMessage) {
+ handleControlMessage(ctx, (ControlMessage) message);
+ } else if (message instanceof SaslMessageToken) {
+ handleSaslMessageToken(ctx, (SaslMessageToken) message);
+ } else {
+ LOG.error("Unexpected message from server: {}", message);
+ }
+ }
+
+ private SaslNettyClient getChannelSaslNettyClient(Channel channel)
throws Exception {
// Generate SASL response to server using Channel-local SASL
client.
- SaslNettyClient saslNettyClient =
SaslNettyClientState.getSaslNettyClient
- .get(channel);
+ SaslNettyClient saslNettyClient =
channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
if (saslNettyClient == null) {
throw new Exception("saslNettyClient was unexpectedly "
- + "null for channel: " + channel);
+ + "null for channel: " + channel);
}
+ return saslNettyClient;
+ }
+
+ private void handleControlMessage(ChannelHandlerContext ctx,
ControlMessage controlMessage) throws Exception {
+ SaslNettyClient saslNettyClient =
getChannelSaslNettyClient(ctx.channel());
+ if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
+ LOG.debug("Server has sent us the SaslComplete "
+ + "message. Allowing normal work to proceed.");
- // examine the response message from server
- if (event.getMessage() instanceof ControlMessage) {
- ControlMessage msg = (ControlMessage) event.getMessage();
- if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
- LOG.debug("Server has sent us the SaslComplete "
- + "message. Allowing normal work to proceed.");
-
- if (!saslNettyClient.isComplete()) {
- LOG.error("Server returned a Sasl-complete message, "
- + "but as far as we can tell, we are not
authenticated yet.");
- throw new Exception("Server returned a "
- + "Sasl-complete message, but as
far as "
- + "we can tell, we are not
authenticated yet.");
- }
- ctx.getPipeline().remove(this);
- this.client.channelReady();
-
- // We call fireMessageReceived since the client is allowed
to
- // perform this request. The client's request will now
proceed
- // to the next pipeline component namely
StormClientHandler.
- Channels.fireMessageReceived(ctx, msg);
- return;
+ if (!saslNettyClient.isComplete()) {
+ LOG.error("Server returned a Sasl-complete message, "
+ + "but as far as we can tell, we are not authenticated
yet.");
+ throw new Exception("Server returned a "
+ + "Sasl-complete message, but as far as "
+ + "we can tell, we are not authenticated yet.");
}
+ ctx.pipeline().remove(this);
+ this.client.channelReady(ctx.channel());
+
+ // We call fireMessageRead since the client is allowed to
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
+ ctx.fireChannelRead(controlMessage);
+ } else {
+ LOG.warn("Unexpected control message: {}", controlMessage);
}
- SaslMessageToken saslTokenMessage = (SaslMessageToken) event
- .getMessage();
+ }
+
+ private void handleSaslMessageToken(ChannelHandlerContext ctx,
SaslMessageToken saslMessageToken) throws Exception {
+ Channel channel = ctx.channel();
+ SaslNettyClient saslNettyClient =
getChannelSaslNettyClient(channel);
LOG.debug("Responding to server's token of length: "
- + saslTokenMessage.getSaslToken().length);
+ + saslMessageToken.getSaslToken().length);
--- End diff --
nit: indent
---