[ 
https://issues.apache.org/jira/browse/STORM-885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990455#comment-14990455
 ] 

ASF GitHub Bot commented on STORM-885:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/838#discussion_r43941560
  
    --- Diff: 
storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
 ---
    @@ -0,0 +1,154 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.messaging.netty;
    +
    +import java.io.IOException;
    +import java.util.Map;
    +import org.jboss.netty.channel.Channel;
    +import org.jboss.netty.channel.ChannelHandlerContext;
    +import org.jboss.netty.channel.ChannelStateEvent;
    +import org.jboss.netty.channel.Channels;
    +import org.jboss.netty.channel.MessageEvent;
    +import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class KerberosSaslClientHandler extends 
SimpleChannelUpstreamHandler {
    +
    +    private static final Logger LOG = LoggerFactory
    +            .getLogger(KerberosSaslClientHandler.class);
    +    private ISaslClient client;
    +    long start_time;
    +    /** Used for client or server's token to send or receive from each 
other. */
    +    private Map storm_conf;
    +    private String jaas_section;
    +
    +    public KerberosSaslClientHandler(ISaslClient client, Map storm_conf, 
String jaas_section) throws IOException {
    +        this.client = client;
    +        this.storm_conf = storm_conf;
    +        this.jaas_section = jaas_section;
    +        start_time = System.currentTimeMillis();
    +    }
    +
    +    @Override
    +    public void channelConnected(ChannelHandlerContext ctx,
    +            ChannelStateEvent event) {
    +        // register the newly established channel
    +        Channel channel = ctx.getChannel();
    +        client.channelConnected(channel);
    +
    +        LOG.info("Connection established from {} to {}",
    +                 channel.getLocalAddress(), channel.getRemoteAddress());
    +
    +        try {
    +            KerberosSaslNettyClient saslNettyClient = 
KerberosSaslNettyClientState.getKerberosSaslNettyClient
    +                    .get(channel);
    +
    +            if (saslNettyClient == null) {
    +                LOG.debug("Creating saslNettyClient now for channel: {}",
    +                          channel);
    +                saslNettyClient = new KerberosSaslNettyClient(storm_conf, 
jaas_section);
    +                
KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
    +                        saslNettyClient);
    +            }
    +            LOG.debug("Going to initiate Kerberos negotiations.");
    +            byte[] initialChallenge = saslNettyClient.saslResponse(new 
SaslMessageToken(new byte[0]));
    +            LOG.debug("Sending initial challenge: {}", initialChallenge);
    +            channel.write(new SaslMessageToken(initialChallenge));
    +        } catch (Exception e) {
    +            LOG.error("Failed to authenticate with server due to error: ",
    +                      e);
    +        }
    +        return;
    +
    +    }
    +
    +    @Override
    +    public void messageReceived(ChannelHandlerContext ctx, MessageEvent 
event)
    +            throws Exception {
    +        LOG.debug("send/recv time (ms): {}",
    +                (System.currentTimeMillis() - start_time));
    +
    +        Channel channel = ctx.getChannel();
    +
    +        // Generate SASL response to server using Channel-local SASL 
client.
    +        KerberosSaslNettyClient saslNettyClient = 
KerberosSaslNettyClientState.getKerberosSaslNettyClient
    +                .get(channel);
    +        if (saslNettyClient == null) {
    +            throw new Exception("saslNettyClient was unexpectedly null for 
channel:" + channel);
    +        }
    +
    +        // examine the response message from server
    +        if (event.getMessage() instanceof ControlMessage) {
    +            ControlMessage msg = (ControlMessage) event.getMessage();
    +            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
    +                LOG.debug("Server has sent us the SaslComplete message. 
Allowing normal work to proceed.");
    +
    +                if (!saslNettyClient.isComplete()) {
    +                    String message = "Server returned a Sasl-complete 
message, but as far as we can tell, we are not authenticated yet.";
    +                    LOG.error(message);
    +                    throw new Exception(message);
    +                }
    +                ctx.getPipeline().remove(this);
    +                this.client.channelReady();
    +
    +                // We call fireMessageReceived since the client is allowed 
to
    +                // perform this request. The client's request will now 
proceed
    +                // to the next pipeline component namely 
StormClientHandler.
    +                Channels.fireMessageReceived(ctx, msg);
    +            }
    +            else {
    +                LOG.warn("Unexpected control message: {}", msg);
    +            }
    +            return;
    +        }
    +        else if (event.getMessage() instanceof SaslMessageToken) {
    +            SaslMessageToken saslTokenMessage = (SaslMessageToken) event
    +                .getMessage();
    +            LOG.debug("Responding to server's token of length: {}",
    +                      saslTokenMessage.getSaslToken().length);
    +
    +            // Generate SASL response (but we only actually send the 
response if
    +            // it's non-null.
    +            byte[] responseToServer = saslNettyClient
    +                .saslResponse(saslTokenMessage);
    +            if (responseToServer == null) {
    +                // If we generate a null response, then authentication has 
completed
    +                // (if not, warn), and return without sending a response 
back to the
    +                // server.
    +                LOG.debug("Response to server is null: authentication 
should now be complete.");
    +                if (!saslNettyClient.isComplete()) {
    +                    LOG.warn("Generated a null response, but 
authentication is not complete.");
    +                    throw new Exception("Our reponse to the server is 
null, but as far as we can tell, we are not authenticated yet.");
    +                }
    +                this.client.channelReady();
    +                return;
    +            } else {
    +                LOG.debug("Response to server token has length: {}",
    +                          responseToServer.length);
    +            }
    +            // Construct a message containing the SASL response and send 
it to the
    +            // server.
    +            SaslMessageToken saslResponse = new 
SaslMessageToken(responseToServer);
    +            channel.write(saslResponse);
    +        }
    +        else {
    --- End diff --
    
    minor nit I thought the style guidelines had all of the else and else if 
clauses on the same line as the previous '}'


> Heartbeat Server (Pacemaker)
> ----------------------------
>
>                 Key: STORM-885
>                 URL: https://issues.apache.org/jira/browse/STORM-885
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Kyle Nusbaum
>
> Large highly connected topologies and large clusters write a lot of data into 
> ZooKeeper.  The heartbeats, that make up the majority of this data, do not 
> need to be persisted to disk.  Pacemaker is intended to be a secure 
> replacement for storing the heartbeats without changing anything within the 
> heartbeats.  In the future as more metrics are added in, we may want to look 
> into switching it over to look more like Heron, where a metrics server is 
> running for each node/topology.  And can be used to aggregate/per-aggregate 
> them in a more scalable manor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to