[ https://issues.apache.org/jira/browse/STORM-885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990455#comment-14990455 ]
ASF GitHub Bot commented on STORM-885: -------------------------------------- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/838#discussion_r43941560 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java --- @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.messaging.netty; + +import java.io.IOException; +import java.util.Map; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler { + + private static final Logger LOG = LoggerFactory + .getLogger(KerberosSaslClientHandler.class); + private ISaslClient client; + long start_time; + /** Used for client or server's token to send or receive from each other. */ + private Map storm_conf; + private String jaas_section; + + public KerberosSaslClientHandler(ISaslClient client, Map storm_conf, String jaas_section) throws IOException { + this.client = client; + this.storm_conf = storm_conf; + this.jaas_section = jaas_section; + start_time = System.currentTimeMillis(); + } + + @Override + public void channelConnected(ChannelHandlerContext ctx, + ChannelStateEvent event) { + // register the newly established channel + Channel channel = ctx.getChannel(); + client.channelConnected(channel); + + LOG.info("Connection established from {} to {}", + channel.getLocalAddress(), channel.getRemoteAddress()); + + try { + KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient + .get(channel); + + if (saslNettyClient == null) { + LOG.debug("Creating saslNettyClient now for channel: {}", + channel); + saslNettyClient = new KerberosSaslNettyClient(storm_conf, jaas_section); + KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel, + saslNettyClient); + } + LOG.debug("Going to initiate Kerberos negotiations."); + byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0])); + LOG.debug("Sending initial challenge: {}", initialChallenge); + channel.write(new SaslMessageToken(initialChallenge)); + } catch (Exception e) { + LOG.error("Failed to authenticate with server due to error: ", + e); + } + return; + + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) + throws Exception { + LOG.debug("send/recv time (ms): {}", + (System.currentTimeMillis() - start_time)); + + Channel channel = ctx.getChannel(); + + // Generate SASL response to server using Channel-local SASL client. + KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient + .get(channel); + if (saslNettyClient == null) { + throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel); + } + + // examine the response message from server + if (event.getMessage() instanceof ControlMessage) { + ControlMessage msg = (ControlMessage) event.getMessage(); + if (msg == ControlMessage.SASL_COMPLETE_REQUEST) { + LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed."); + + if (!saslNettyClient.isComplete()) { + String message = "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet."; + LOG.error(message); + throw new Exception(message); + } + ctx.getPipeline().remove(this); + this.client.channelReady(); + + // We call fireMessageReceived since the client is allowed to + // perform this request. The client's request will now proceed + // to the next pipeline component namely StormClientHandler. + Channels.fireMessageReceived(ctx, msg); + } + else { + LOG.warn("Unexpected control message: {}", msg); + } + return; + } + else if (event.getMessage() instanceof SaslMessageToken) { + SaslMessageToken saslTokenMessage = (SaslMessageToken) event + .getMessage(); + LOG.debug("Responding to server's token of length: {}", + saslTokenMessage.getSaslToken().length); + + // Generate SASL response (but we only actually send the response if + // it's non-null. + byte[] responseToServer = saslNettyClient + .saslResponse(saslTokenMessage); + if (responseToServer == null) { + // If we generate a null response, then authentication has completed + // (if not, warn), and return without sending a response back to the + // server. + LOG.debug("Response to server is null: authentication should now be complete."); + if (!saslNettyClient.isComplete()) { + LOG.warn("Generated a null response, but authentication is not complete."); + throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet."); + } + this.client.channelReady(); + return; + } else { + LOG.debug("Response to server token has length: {}", + responseToServer.length); + } + // Construct a message containing the SASL response and send it to the + // server. + SaslMessageToken saslResponse = new SaslMessageToken(responseToServer); + channel.write(saslResponse); + } + else { --- End diff -- minor nit I thought the style guidelines had all of the else and else if clauses on the same line as the previous '}' > Heartbeat Server (Pacemaker) > ---------------------------- > > Key: STORM-885 > URL: https://issues.apache.org/jira/browse/STORM-885 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Kyle Nusbaum > > Large highly connected topologies and large clusters write a lot of data into > ZooKeeper. The heartbeats, that make up the majority of this data, do not > need to be persisted to disk. Pacemaker is intended to be a secure > replacement for storing the heartbeats without changing anything within the > heartbeats. In the future as more metrics are added in, we may want to look > into switching it over to look more like Heron, where a metrics server is > running for each node/topology. And can be used to aggregate/per-aggregate > them in a more scalable manor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)