http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 1f925f9..89c6e6c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -18,11 +18,12 @@
 package org.apache.kafka.common.security.authenticator;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-
+import java.util.Set;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-
 import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
@@ -44,51 +45,102 @@ import org.ietf.jgss.GSSName;
 import org.ietf.jgss.Oid;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractRequestResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.requests.ResponseSend;
+import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 
 public class SaslServerAuthenticator implements Authenticator {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
-    private final SaslServer saslServer;
-    private final Subject subject;
+    public enum SaslState {
+        HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED
+    }
+
     private final String node;
+    private final Subject subject;
     private final KerberosShortNamer kerberosNamer;
     private final int maxReceiveSize;
+    private final String host;
+
+    // Current SASL state
+    private SaslState saslState = SaslState.HANDSHAKE_REQUEST;
+    // Next SASL state to be set when outgoing writes associated with the 
current SASL state complete
+    private SaslState pendingSaslState = null;
+    private SaslServer saslServer;
+    private String saslMechanism;
+    private AuthCallbackHandler callbackHandler;
 
     // assigned in `configure`
     private TransportLayer transportLayer;
+    private Set<String> enabledMechanisms;
+    private Map<String, ?> configs;
 
     // buffers used in `authenticate`
     private NetworkReceive netInBuffer;
     private NetworkSend netOutBuffer;
 
-    public SaslServerAuthenticator(String node, final Subject subject, 
KerberosShortNamer kerberosNameParser, int maxReceiveSize) throws IOException {
+    public SaslServerAuthenticator(String node, final Subject subject, 
KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize) throws 
IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
-        if (subject.getPrincipals().isEmpty())
-            throw new IllegalArgumentException("subject must have at least one 
principal");
         this.node = node;
         this.subject = subject;
         this.kerberosNamer = kerberosNameParser;
         this.maxReceiveSize = maxReceiveSize;
-        saslServer = createSaslServer();
+        this.host = host;
     }
 
     public void configure(TransportLayer transportLayer, PrincipalBuilder 
principalBuilder, Map<String, ?> configs) {
         this.transportLayer = transportLayer;
+        this.configs = configs;
+        List<String> enabledMechanisms = (List<String>) 
this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS);
+        if (enabledMechanisms == null || enabledMechanisms.isEmpty())
+            throw new IllegalArgumentException("No SASL mechanisms are 
enabled");
+        this.enabledMechanisms = new HashSet<>(enabledMechanisms);
     }
 
-    private SaslServer createSaslServer() throws IOException {
+    private void createSaslServer(String mechanism) throws IOException {
+        this.saslMechanism = mechanism;
+        callbackHandler = new 
SaslServerCallbackHandler(Configuration.getConfiguration(), kerberosNamer);
+        callbackHandler.configure(configs, Mode.SERVER, subject, 
saslMechanism);
+        if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
+            if (subject.getPrincipals().isEmpty())
+                throw new IllegalArgumentException("subject must have at least 
one principal");
+            saslServer = createSaslKerberosServer(callbackHandler, configs);
+        } else {
+            try {
+                saslServer = Subject.doAs(subject, new 
PrivilegedExceptionAction<SaslServer>() {
+                    public SaslServer run() throws SaslException {
+                        return Sasl.createSaslServer(saslMechanism, "kafka", 
host, configs, callbackHandler);
+                    }
+                });
+            } catch (PrivilegedActionException e) {
+                throw new SaslException("Kafka Server failed to create a 
SaslServer to interact with a client during session authentication", 
e.getCause());
+            }
+        }
+    }
+
+    private SaslServer createSaslKerberosServer(final AuthCallbackHandler 
saslServerCallbackHandler, final Map<String, ?> configs) throws IOException {
         // server is using a JAAS-authenticated subject: determine service 
principal name and hostname from kafka server's subject.
-        final SaslServerCallbackHandler saslServerCallbackHandler = new 
SaslServerCallbackHandler(
-                Configuration.getConfiguration(), kerberosNamer);
         final Principal servicePrincipal = 
subject.getPrincipals().iterator().next();
         KerberosName kerberosName;
         try {
@@ -99,9 +151,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
         final String servicePrincipalName = kerberosName.serviceName();
         final String serviceHostname = kerberosName.hostName();
 
-        final String mech = "GSSAPI";
-
-        LOG.debug("Creating SaslServer for {} with mechanism {}", 
kerberosName, mech);
+        LOG.debug("Creating SaslServer for {} with mechanism {}", 
kerberosName, saslMechanism);
 
         // As described in 
http://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/jgss-features.html:
         // "To enable Java GSS to delegate to the native GSS library and its 
list of native mechanisms,
@@ -127,7 +177,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
         try {
             return Subject.doAs(subject, new 
PrivilegedExceptionAction<SaslServer>() {
                 public SaslServer run() throws SaslException {
-                    return Sasl.createSaslServer(mech, servicePrincipalName, 
serviceHostname, null, saslServerCallbackHandler);
+                    return Sasl.createSaslServer(saslMechanism, 
servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler);
                 }
             });
         } catch (PrivilegedActionException e) {
@@ -146,8 +196,8 @@ public class SaslServerAuthenticator implements 
Authenticator {
         if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps())
             return;
 
-        if (saslServer.isComplete()) {
-            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
+        if (saslServer != null && saslServer.isComplete()) {
+            setSaslState(SaslState.COMPLETE);
             return;
         }
 
@@ -161,12 +211,28 @@ public class SaslServerAuthenticator implements 
Authenticator {
             netInBuffer.payload().get(clientToken, 0, clientToken.length);
             netInBuffer = null; // reset the networkReceive as we read all the 
data.
             try {
-                byte[] response = saslServer.evaluateResponse(clientToken);
-                if (response != null) {
-                    netOutBuffer = new NetworkSend(node, 
ByteBuffer.wrap(response));
-                    flushNetOutBufferAndUpdateInterestOps();
+                switch (saslState) {
+                    case HANDSHAKE_REQUEST:
+                        if (handleKafkaRequest(clientToken))
+                            break;
+                        // For default GSSAPI, fall through to authenticate 
using the client token as the first GSSAPI packet.
+                        // This is required for interoperability with 0.9.0.x 
clients which do not send handshake request
+                    case AUTHENTICATE:
+                        byte[] response = 
saslServer.evaluateResponse(clientToken);
+                        if (response != null) {
+                            netOutBuffer = new NetworkSend(node, 
ByteBuffer.wrap(response));
+                            flushNetOutBufferAndUpdateInterestOps();
+                        }
+                        // When the authentication exchange is complete and no 
more tokens are expected from the client,
+                        // update SASL state. Current SASL state will be 
updated when outgoing writes to the client complete.
+                        if (saslServer.isComplete())
+                            setSaslState(SaslState.COMPLETE);
+                        break;
+                    default:
+                        break;
                 }
             } catch (Exception e) {
+                setSaslState(SaslState.FAILED);
                 throw new IOException(e);
             }
         }
@@ -177,18 +243,33 @@ public class SaslServerAuthenticator implements 
Authenticator {
     }
 
     public boolean complete() {
-        return saslServer.isComplete();
+        return saslState == SaslState.COMPLETE;
     }
 
     public void close() throws IOException {
-        saslServer.dispose();
+        if (saslServer != null)
+            saslServer.dispose();
+        if (callbackHandler != null)
+            callbackHandler.close();
+    }
+
+    private void setSaslState(SaslState saslState) {
+        if (netOutBuffer != null && !netOutBuffer.completed())
+            pendingSaslState = saslState;
+        else {
+            this.pendingSaslState = null;
+            this.saslState = saslState;
+            LOG.debug("Set SASL server state to {}", saslState);
+        }
     }
 
     private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException 
{
         boolean flushedCompletely = flushNetOutBuffer();
-        if (flushedCompletely)
+        if (flushedCompletely) {
             transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
-        else
+            if (pendingSaslState != null)
+                setSaslState(pendingSaslState);
+        } else
             transportLayer.addInterestOps(SelectionKey.OP_WRITE);
         return flushedCompletely;
     }
@@ -198,4 +279,66 @@ public class SaslServerAuthenticator implements 
Authenticator {
             netOutBuffer.writeTo(transportLayer);
         return netOutBuffer.completed();
     }
+
+    private boolean handleKafkaRequest(byte[] requestBytes) throws 
IOException, AuthenticationException {
+        boolean isKafkaRequest = false;
+        String clientMechanism = null;
+        try {
+            ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+            RequestHeader requestHeader = RequestHeader.parse(requestBuffer);
+            AbstractRequest request = 
AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), 
requestBuffer);
+            isKafkaRequest = true;
+
+            ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+            switch (apiKey) {
+                case SASL_HANDSHAKE:
+                    clientMechanism = handleHandshakeRequest(requestHeader, 
(SaslHandshakeRequest) request);
+                    break;
+                default:
+                    throw new IllegalSaslStateException("Unexpected Kafka 
request of type " + apiKey + " during SASL handshake.");
+            }
+        } catch (SchemaException | IllegalArgumentException e) {
+            // SchemaException is thrown if the request is not in Kafka 
format. IIlegalArgumentException is thrown
+            // if the API key is invalid. For compatibility with 0.9.0.x where 
the first packet is a GSSAPI token
+            // starting with 0x60, revert to GSSAPI for both these exceptions.
+            if (LOG.isDebugEnabled()) {
+                StringBuilder tokenBuilder = new StringBuilder();
+                for (byte b : requestBytes) {
+                    tokenBuilder.append(String.format("%02x", b));
+                    if (tokenBuilder.length() >= 20)
+                         break;
+                }
+                LOG.debug("Received client packet of length {} starting with 
bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder);
+            }
+            if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) {
+                LOG.debug("First client packet is not a SASL mechanism 
request, using default mechanism GSSAPI");
+                clientMechanism = SaslConfigs.GSSAPI_MECHANISM;
+            } else
+                throw new UnsupportedSaslMechanismException("Exception 
handling first SASL packet from client, GSSAPI is not supported by server", e);
+        }
+        if (clientMechanism != null) {
+            createSaslServer(clientMechanism);
+            setSaslState(SaslState.AUTHENTICATE);
+        }
+        return isKafkaRequest;
+    }
+
+    private String handleHandshakeRequest(RequestHeader requestHeader, 
SaslHandshakeRequest handshakeRequest) throws IOException, 
UnsupportedSaslMechanismException {
+        String clientMechanism = handshakeRequest.mechanism();
+        if (enabledMechanisms.contains(clientMechanism)) {
+            LOG.debug("Using SASL mechanism '{}' provided by client", 
clientMechanism);
+            sendKafkaResponse(requestHeader, new SaslHandshakeResponse((short) 
0, enabledMechanisms));
+            return clientMechanism;
+        } else {
+            LOG.debug("SASL mechanism '{}' requested by client is not 
supported", clientMechanism);
+            sendKafkaResponse(requestHeader, new 
SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM.code(), 
enabledMechanisms));
+            throw new UnsupportedSaslMechanismException("Unsupported SASL 
mechanism " + clientMechanism);
+        }
+    }
+
+    private void sendKafkaResponse(RequestHeader requestHeader, 
AbstractRequestResponse response) throws IOException {
+        ResponseHeader responseHeader = new 
ResponseHeader(requestHeader.correlationId());
+        netOutBuffer = new NetworkSend(node, 
ResponseSend.serialize(responseHeader, response.toStruct()));
+        flushNetOutBufferAndUpdateInterestOps();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
index 1de4a2e..c23e390 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
@@ -19,12 +19,15 @@
 package org.apache.kafka.common.security.authenticator;
 
 import java.io.IOException;
+import java.util.Map;
 
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
@@ -32,9 +35,16 @@ import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
 
 import org.apache.kafka.common.security.kerberos.KerberosName;
+import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.security.JaasUtils;
 
-public class SaslServerCallbackHandler implements CallbackHandler {
+/**
+ * Callback handler for Sasl servers. The callbacks required for all the SASL
+ * mechanisms enabled in the server should be supported by this callback 
handler. See
+ * <a 
href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/sasl/sasl-refguide.html";>Java
 SASL API</a>
+ * for the list of SASL callback handlers required for each SASL mechanism.
+ */
+public class SaslServerCallbackHandler implements AuthCallbackHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(SaslServerCallbackHandler.class);
     private final KerberosShortNamer kerberosShortNamer;
 
@@ -45,6 +55,11 @@ public class SaslServerCallbackHandler implements 
CallbackHandler {
         this.kerberosShortNamer = kerberosNameParser;
     }
 
+    @Override
+    public void configure(Map<String, ?> configs, Mode mode, Subject subject, 
String saslMechanism) {
+    }
+
+    @Override
     public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
         for (Callback callback : callbacks) {
             if (callback instanceof RealmCallback) {
@@ -78,4 +93,7 @@ public class SaslServerCallbackHandler implements 
CallbackHandler {
         }
     }
 
+    @Override
+    public void close() {
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
new file mode 100644
index 0000000..58becdf
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.kerberos;
+
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.Subject;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.utils.Shell;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Random;
+import java.util.Set;
+import java.util.Map;
+
+/**
+ * This class is responsible for refreshing Kerberos credentials for
+ * logins for both Kafka client and server.
+ */
+public class KerberosLogin extends AbstractLogin {
+    private static final Logger log = 
LoggerFactory.getLogger(KerberosLogin.class);
+
+    private static final Random RNG = new Random();
+
+    private final Time time = new SystemTime();
+    private Thread t;
+    private boolean isKrbTicket;
+    private boolean isUsingTicketCache;
+
+    private String loginContextName;
+    private String principal;
+
+    // LoginThread will sleep until 80% of time from last refresh to
+    // ticket's expiry has been reached, at which time it will wake
+    // and try to renew the ticket.
+    private double ticketRenewWindowFactor;
+
+    /**
+     * Percentage of random jitter added to the renewal time
+     */
+    private double ticketRenewJitter;
+
+    // Regardless of ticketRenewWindowFactor setting above and the ticket 
expiry time,
+    // thread will not sleep between refresh attempts any less than 1 minute 
(60*1000 milliseconds = 1 minute).
+    // Change the '1' to e.g. 5, to change this to 5 minutes.
+    private long minTimeBeforeRelogin;
+
+    private String kinitCmd;
+
+    private volatile Subject subject;
+
+    private LoginContext loginContext;
+    private String serviceName;
+    private long lastLogin;
+
+    /**
+     * Login constructor. The constructor starts the thread used
+     * to periodically re-login to the Kerberos Ticket Granting Server.
+     * @param loginContextName
+     *               name of section in JAAS file that will be use to login.
+     *               Passed as first param to 
javax.security.auth.login.LoginContext().
+     * @param configs configure Login with the given key-value pairs.
+     * @throws javax.security.auth.login.LoginException
+     *               Thrown if authentication fails.
+     */
+    public void configure(Map<String, ?> configs, final String 
loginContextName) {
+        super.configure(configs, loginContextName);
+        this.loginContextName = loginContextName;
+        this.ticketRenewWindowFactor = (Double) 
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
+        this.ticketRenewJitter = (Double) 
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
+        this.minTimeBeforeRelogin = (Long) 
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
+        this.kinitCmd = (String) 
configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD);
+        this.serviceName = getServiceName(configs, loginContextName);
+    }
+
+    @Override
+    public LoginContext login() throws LoginException {
+
+        this.lastLogin = currentElapsedTime();
+        loginContext = super.login();
+        subject = loginContext.getSubject();
+        isKrbTicket = 
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+
+        AppConfigurationEntry[] entries = 
Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
+        if (entries.length == 0) {
+            isUsingTicketCache = false;
+            principal = null;
+        } else {
+            // there will only be a single entry
+            AppConfigurationEntry entry = entries[0];
+            if (entry.getOptions().get("useTicketCache") != null) {
+                String val = (String) entry.getOptions().get("useTicketCache");
+                isUsingTicketCache = val.equals("true");
+            } else
+                isUsingTicketCache = false;
+            if (entry.getOptions().get("principal") != null)
+                principal = (String) entry.getOptions().get("principal");
+            else
+                principal = null;
+        }
+
+        if (!isKrbTicket) {
+            log.debug("It is not a Kerberos ticket");
+            t = null;
+            // if no TGT, do not bother with ticket management.
+            return loginContext;
+        }
+        log.debug("It is a Kerberos ticket");
+
+        // Refresh the Ticket Granting Ticket (TGT) periodically. How often to 
refresh is determined by the
+        // TGT's existing expiry date and the configured minTimeBeforeRelogin. 
For testing and development,
+        // you can decrease the interval of expiration of tickets (for 
example, to 3 minutes) by running:
+        //  "modprinc -maxlife 3mins <principal>" in kadmin.
+        t = Utils.newThread("kafka-kerberos-refresh-thread", new Runnable() {
+            public void run() {
+                log.info("TGT refresh thread started.");
+                while (true) {  // renewal thread's main loop. if it exits 
from here, thread will exit.
+                    KerberosTicket tgt = getTGT();
+                    long now = currentWallTime();
+                    long nextRefresh;
+                    Date nextRefreshDate;
+                    if (tgt == null) {
+                        nextRefresh = now + minTimeBeforeRelogin;
+                        nextRefreshDate = new Date(nextRefresh);
+                        log.warn("No TGT found: will try again at {}", 
nextRefreshDate);
+                    } else {
+                        nextRefresh = getRefreshTime(tgt);
+                        long expiry = tgt.getEndTime().getTime();
+                        Date expiryDate = new Date(expiry);
+                        if (isUsingTicketCache && tgt.getRenewTill() != null 
&& tgt.getRenewTill().getTime() < expiry) {
+                            log.error("The TGT cannot be renewed beyond the 
next expiry date: {}." +
+                                    "This process will not be able to 
authenticate new SASL connections after that " +
+                                    "time (for example, it will not be able to 
authenticate a new connection with a Kafka " +
+                                    "Broker).  Ask your system administrator 
to either increase the " +
+                                    "'renew until' time by doing : 'modprinc 
-maxrenewlife {} ' within " +
+                                    "kadmin, or instead, to generate a keytab 
for {}. Because the TGT's " +
+                                    "expiry cannot be further extended by 
refreshing, exiting refresh thread now.",
+                                    expiryDate, principal, principal);
+                            return;
+                        }
+                        // determine how long to sleep from looking at 
ticket's expiry.
+                        // We should not allow the ticket to expire, but we 
should take into consideration
+                        // minTimeBeforeRelogin. Will not sleep less than 
minTimeBeforeRelogin, unless doing so
+                        // would cause ticket expiration.
+                        if ((nextRefresh > expiry) || (now + 
minTimeBeforeRelogin > expiry)) {
+                            // expiry is before next scheduled refresh).
+                            log.info("Refreshing now because expiry is before 
next scheduled refresh time.");
+                            nextRefresh = now;
+                        } else {
+                            if (nextRefresh < (now + minTimeBeforeRelogin)) {
+                                // next scheduled refresh is sooner than (now 
+ MIN_TIME_BEFORE_LOGIN).
+                                Date until = new Date(nextRefresh);
+                                Date newUntil = new Date(now + 
minTimeBeforeRelogin);
+                                log.warn("TGT refresh thread time adjusted 
from {} to {} since the former is sooner " +
+                                        "than the minimum refresh interval ({} 
seconds) from now.",
+                                        until, newUntil, minTimeBeforeRelogin 
/ 1000);
+                            }
+                            nextRefresh = Math.max(nextRefresh, now + 
minTimeBeforeRelogin);
+                        }
+                        nextRefreshDate = new Date(nextRefresh);
+                        if (nextRefresh > expiry) {
+                            log.error("Next refresh: {} is later than expiry 
{}. This may indicate a clock skew problem." +
+                                    "Check that this host and the KDC hosts' 
clocks are in sync. Exiting refresh thread.",
+                                    nextRefreshDate, expiryDate);
+                            return;
+                        }
+                    }
+                    if (now < nextRefresh) {
+                        Date until = new Date(nextRefresh);
+                        log.info("TGT refresh sleeping until: {}", until);
+                        try {
+                            Thread.sleep(nextRefresh - now);
+                        } catch (InterruptedException ie) {
+                            log.warn("TGT renewal thread has been interrupted 
and will exit.");
+                            return;
+                        }
+                    } else {
+                        log.error("NextRefresh: {} is in the past: exiting 
refresh thread. Check"
+                                + " clock sync between this host and KDC - 
(KDC's clock is likely ahead of this host)."
+                                + " Manual intervention will be required for 
this client to successfully authenticate."
+                                + " Exiting refresh thread.", nextRefreshDate);
+                        return;
+                    }
+                    if (isUsingTicketCache) {
+                        String kinitArgs = "-R";
+                        int retry = 1;
+                        while (retry >= 0) {
+                            try {
+                                log.debug("Running ticket cache refresh 
command: {} {}", kinitCmd, kinitArgs);
+                                Shell.execCommand(kinitCmd, kinitArgs);
+                                break;
+                            } catch (Exception e) {
+                                if (retry > 0) {
+                                    --retry;
+                                    // sleep for 10 seconds
+                                    try {
+                                        Thread.sleep(10 * 1000);
+                                    } catch (InterruptedException ie) {
+                                        log.error("Interrupted while renewing 
TGT, exiting Login thread");
+                                        return;
+                                    }
+                                } else {
+                                    log.warn("Could not renew TGT due to 
problem running shell command: '" + kinitCmd
+                                            + " " + kinitArgs + "'" + "; 
exception was: " + e + ". Exiting refresh thread.", e);
+                                    return;
+                                }
+                            }
+                        }
+                    }
+                    try {
+                        int retry = 1;
+                        while (retry >= 0) {
+                            try {
+                                reLogin();
+                                break;
+                            } catch (LoginException le) {
+                                if (retry > 0) {
+                                    --retry;
+                                    // sleep for 10 seconds.
+                                    try {
+                                        Thread.sleep(10 * 1000);
+                                    } catch (InterruptedException e) {
+                                        log.error("Interrupted during login 
retry after LoginException:", le);
+                                        throw le;
+                                    }
+                                } else {
+                                    log.error("Could not refresh TGT for 
principal: " + principal + ".", le);
+                                }
+                            }
+                        }
+                    } catch (LoginException le) {
+                        log.error("Failed to refresh TGT: refresh thread 
exiting now.", le);
+                        return;
+                    }
+                }
+            }
+        }, true);
+        t.start();
+        return loginContext;
+    }
+
+    @Override
+    public void close() {
+        if ((t != null) && (t.isAlive())) {
+            t.interrupt();
+            try {
+                t.join();
+            } catch (InterruptedException e) {
+                log.warn("Error while waiting for Login thread to shutdown: " 
+ e, e);
+            }
+        }
+    }
+
+    @Override
+    public Subject subject() {
+        return subject;
+    }
+
+    @Override
+    public String serviceName() {
+        return serviceName;
+    }
+
+    private String getServiceName(Map<String, ?> configs, String loginContext) 
{
+        String jaasServiceName;
+        try {
+            jaasServiceName = JaasUtils.jaasConfig(loginContext, 
JaasUtils.SERVICE_NAME);
+        } catch (IOException e) {
+            throw new KafkaException("Jaas configuration not found", e);
+        }
+        String configServiceName = (String) 
configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
+        if (jaasServiceName != null && configServiceName != null && 
!jaasServiceName.equals(configServiceName)) {
+            String message = "Conflicting serviceName values found in JAAS and 
Kafka configs " +
+                "value in JAAS file " + jaasServiceName + ", value in Kafka 
config " + configServiceName;
+            throw new IllegalArgumentException(message);
+        }
+
+        if (jaasServiceName != null)
+            return jaasServiceName;
+        if (configServiceName != null)
+            return configServiceName;
+
+        throw new IllegalArgumentException("No serviceName defined in either 
JAAS or Kafka config");
+    }
+
+
+    private long getRefreshTime(KerberosTicket tgt) {
+        long start = tgt.getStartTime().getTime();
+        long expires = tgt.getEndTime().getTime();
+        log.info("TGT valid starting at: {}", tgt.getStartTime());
+        log.info("TGT expires: {}", tgt.getEndTime());
+        long proposedRefresh = start + (long) ((expires - start) *
+                (ticketRenewWindowFactor + (ticketRenewJitter * 
RNG.nextDouble())));
+
+        if (proposedRefresh > expires)
+            // proposedRefresh is too far in the future: it's after ticket 
expires: simply return now.
+            return currentWallTime();
+        else
+            return proposedRefresh;
+    }
+
+    private synchronized KerberosTicket getTGT() {
+        Set<KerberosTicket> tickets = 
subject.getPrivateCredentials(KerberosTicket.class);
+        for (KerberosTicket ticket : tickets) {
+            KerberosPrincipal server = ticket.getServer();
+            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + 
server.getRealm())) {
+                log.debug("Found TGT {}.", ticket);
+                return ticket;
+            }
+        }
+        return null;
+    }
+
+    private boolean hasSufficientTimeElapsed() {
+        long now = currentElapsedTime();
+        if (now - lastLogin < minTimeBeforeRelogin) {
+            log.warn("Not attempting to re-login since the last re-login was 
attempted less than {} seconds before.",
+                    minTimeBeforeRelogin / 1000);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Re-login a principal. This method assumes that {@link #login(String)} 
has happened already.
+     * @throws javax.security.auth.login.LoginException on a failure
+     */
+    private synchronized void reLogin() throws LoginException {
+        if (!isKrbTicket) {
+            return;
+        }
+        if (loginContext == null) {
+            throw new LoginException("Login must be done first");
+        }
+        if (!hasSufficientTimeElapsed()) {
+            return;
+        }
+        log.info("Initiating logout for {}", principal);
+        synchronized (KerberosLogin.class) {
+            // register most recent relogin attempt
+            lastLogin = currentElapsedTime();
+            //clear up the kerberos state. But the tokens are not cleared! As 
per
+            //the Java kerberos login module code, only the kerberos 
credentials
+            //are cleared
+            loginContext.logout();
+            //login and also update the subject field of this instance to
+            //have the new credentials (pass it to the LoginContext 
constructor)
+            loginContext = new LoginContext(loginContextName, subject);
+            log.info("Initiating re-login for {}", principal);
+            loginContext.login();
+        }
+    }
+
+    private long currentElapsedTime() {
+        return time.nanoseconds() / 1000000;
+    }
+
+    private long currentWallTime() {
+        return time.milliseconds();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
deleted file mode 100644
index 2e1a056..0000000
--- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.security.kerberos;
-
-import javax.security.auth.kerberos.KerberosPrincipal;
-import javax.security.auth.login.AppConfigurationEntry;
-import javax.security.auth.login.Configuration;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.kerberos.KerberosTicket;
-import javax.security.auth.Subject;
-
-import 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.ClientCallbackHandler;
-import org.apache.kafka.common.security.JaasUtils;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.utils.Shell;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.SystemTime;
-
-import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-import java.util.Random;
-import java.util.Set;
-import java.util.Map;
-
-/**
- * This class is responsible for refreshing Kerberos credentials for
- * logins for both Kafka client and server.
- */
-public class Login {
-    private static final Logger log = LoggerFactory.getLogger(Login.class);
-
-    private static final Random RNG = new Random();
-
-    private final Thread t;
-    private final boolean isKrbTicket;
-    private final boolean isUsingTicketCache;
-
-    private final String loginContextName;
-    private final String principal;
-    private final Time time = new SystemTime();
-    private final CallbackHandler callbackHandler = new 
ClientCallbackHandler();
-
-    // LoginThread will sleep until 80% of time from last refresh to
-    // ticket's expiry has been reached, at which time it will wake
-    // and try to renew the ticket.
-    private final double ticketRenewWindowFactor;
-
-    /**
-     * Percentage of random jitter added to the renewal time
-     */
-    private final double ticketRenewJitter;
-
-    // Regardless of ticketRenewWindowFactor setting above and the ticket 
expiry time,
-    // thread will not sleep between refresh attempts any less than 1 minute 
(60*1000 milliseconds = 1 minute).
-    // Change the '1' to e.g. 5, to change this to 5 minutes.
-    private final long minTimeBeforeRelogin;
-
-    private final String kinitCmd;
-
-    private volatile Subject subject;
-
-    private LoginContext login;
-    private long lastLogin;
-
-    /**
-     * Login constructor. The constructor starts the thread used
-     * to periodically re-login to the Kerberos Ticket Granting Server.
-     * @param loginContextName
-     *               name of section in JAAS file that will be use to login.
-     *               Passed as first param to 
javax.security.auth.login.LoginContext().
-     * @param configs configure Login with the given key-value pairs.
-     * @throws javax.security.auth.login.LoginException
-     *               Thrown if authentication fails.
-     */
-    public Login(final String loginContextName, Map<String, ?> configs) throws 
LoginException {
-        this.loginContextName = loginContextName;
-        this.ticketRenewWindowFactor = (Double) 
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
-        this.ticketRenewJitter = (Double) 
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER);
-        this.minTimeBeforeRelogin = (Long) 
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
-        this.kinitCmd = (String) 
configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD);
-
-        this.lastLogin = currentElapsedTime();
-        login = login(loginContextName);
-        subject = login.getSubject();
-        isKrbTicket = 
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
-
-        AppConfigurationEntry[] entries = 
Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
-        if (entries.length == 0) {
-            isUsingTicketCache = false;
-            principal = null;
-        } else {
-            // there will only be a single entry
-            AppConfigurationEntry entry = entries[0];
-            if (entry.getOptions().get("useTicketCache") != null) {
-                String val = (String) entry.getOptions().get("useTicketCache");
-                isUsingTicketCache = val.equals("true");
-            } else
-                isUsingTicketCache = false;
-            if (entry.getOptions().get("principal") != null)
-                principal = (String) entry.getOptions().get("principal");
-            else
-                principal = null;
-        }
-
-        if (!isKrbTicket) {
-            log.debug("It is not a Kerberos ticket");
-            t = null;
-            // if no TGT, do not bother with ticket management.
-            return;
-        }
-        log.debug("It is a Kerberos ticket");
-
-        // Refresh the Ticket Granting Ticket (TGT) periodically. How often to 
refresh is determined by the
-        // TGT's existing expiry date and the configured minTimeBeforeRelogin. 
For testing and development,
-        // you can decrease the interval of expiration of tickets (for 
example, to 3 minutes) by running:
-        //  "modprinc -maxlife 3mins <principal>" in kadmin.
-        t = Utils.newThread("kafka-kerberos-refresh-thread", new Runnable() {
-            public void run() {
-                log.info("TGT refresh thread started.");
-                while (true) {  // renewal thread's main loop. if it exits 
from here, thread will exit.
-                    KerberosTicket tgt = getTGT();
-                    long now = currentWallTime();
-                    long nextRefresh;
-                    Date nextRefreshDate;
-                    if (tgt == null) {
-                        nextRefresh = now + minTimeBeforeRelogin;
-                        nextRefreshDate = new Date(nextRefresh);
-                        log.warn("No TGT found: will try again at {}", 
nextRefreshDate);
-                    } else {
-                        nextRefresh = getRefreshTime(tgt);
-                        long expiry = tgt.getEndTime().getTime();
-                        Date expiryDate = new Date(expiry);
-                        if (isUsingTicketCache && tgt.getRenewTill() != null 
&& tgt.getRenewTill().getTime() < expiry) {
-                            log.error("The TGT cannot be renewed beyond the 
next expiry date: {}." +
-                                    "This process will not be able to 
authenticate new SASL connections after that " +
-                                    "time (for example, it will not be able to 
authenticate a new connection with a Kafka " +
-                                    "Broker).  Ask your system administrator 
to either increase the " +
-                                    "'renew until' time by doing : 'modprinc 
-maxrenewlife {} ' within " +
-                                    "kadmin, or instead, to generate a keytab 
for {}. Because the TGT's " +
-                                    "expiry cannot be further extended by 
refreshing, exiting refresh thread now.",
-                                    expiryDate, principal, principal);
-                            return;
-                        }
-                        // determine how long to sleep from looking at 
ticket's expiry.
-                        // We should not allow the ticket to expire, but we 
should take into consideration
-                        // minTimeBeforeRelogin. Will not sleep less than 
minTimeBeforeRelogin, unless doing so
-                        // would cause ticket expiration.
-                        if ((nextRefresh > expiry) || (now + 
minTimeBeforeRelogin > expiry)) {
-                            // expiry is before next scheduled refresh).
-                            log.info("Refreshing now because expiry is before 
next scheduled refresh time.");
-                            nextRefresh = now;
-                        } else {
-                            if (nextRefresh < (now + minTimeBeforeRelogin)) {
-                                // next scheduled refresh is sooner than (now 
+ MIN_TIME_BEFORE_LOGIN).
-                                Date until = new Date(nextRefresh);
-                                Date newUntil = new Date(now + 
minTimeBeforeRelogin);
-                                log.warn("TGT refresh thread time adjusted 
from {} to {} since the former is sooner " +
-                                        "than the minimum refresh interval ({} 
seconds) from now.",
-                                        until, newUntil, minTimeBeforeRelogin 
/ 1000);
-                            }
-                            nextRefresh = Math.max(nextRefresh, now + 
minTimeBeforeRelogin);
-                        }
-                        nextRefreshDate = new Date(nextRefresh);
-                        if (nextRefresh > expiry) {
-                            log.error("Next refresh: {} is later than expiry 
{}. This may indicate a clock skew problem." +
-                                    "Check that this host and the KDC hosts' 
clocks are in sync. Exiting refresh thread.",
-                                    nextRefreshDate, expiryDate);
-                            return;
-                        }
-                    }
-                    if (now < nextRefresh) {
-                        Date until = new Date(nextRefresh);
-                        log.info("TGT refresh sleeping until: {}", until);
-                        try {
-                            Thread.sleep(nextRefresh - now);
-                        } catch (InterruptedException ie) {
-                            log.warn("TGT renewal thread has been interrupted 
and will exit.");
-                            return;
-                        }
-                    } else {
-                        log.error("NextRefresh: {} is in the past: exiting 
refresh thread. Check"
-                                + " clock sync between this host and KDC - 
(KDC's clock is likely ahead of this host)."
-                                + " Manual intervention will be required for 
this client to successfully authenticate."
-                                + " Exiting refresh thread.", nextRefreshDate);
-                        return;
-                    }
-                    if (isUsingTicketCache) {
-                        String kinitArgs = "-R";
-                        int retry = 1;
-                        while (retry >= 0) {
-                            try {
-                                log.debug("Running ticket cache refresh 
command: {} {}", kinitCmd, kinitArgs);
-                                Shell.execCommand(kinitCmd, kinitArgs);
-                                break;
-                            } catch (Exception e) {
-                                if (retry > 0) {
-                                    --retry;
-                                    // sleep for 10 seconds
-                                    try {
-                                        Thread.sleep(10 * 1000);
-                                    } catch (InterruptedException ie) {
-                                        log.error("Interrupted while renewing 
TGT, exiting Login thread");
-                                        return;
-                                    }
-                                } else {
-                                    log.warn("Could not renew TGT due to 
problem running shell command: '" + kinitCmd
-                                            + " " + kinitArgs + "'" + "; 
exception was: " + e + ". Exiting refresh thread.", e);
-                                    return;
-                                }
-                            }
-                        }
-                    }
-                    try {
-                        int retry = 1;
-                        while (retry >= 0) {
-                            try {
-                                reLogin();
-                                break;
-                            } catch (LoginException le) {
-                                if (retry > 0) {
-                                    --retry;
-                                    // sleep for 10 seconds.
-                                    try {
-                                        Thread.sleep(10 * 1000);
-                                    } catch (InterruptedException e) {
-                                        log.error("Interrupted during login 
retry after LoginException:", le);
-                                        throw le;
-                                    }
-                                } else {
-                                    log.error("Could not refresh TGT for 
principal: " + principal + ".", le);
-                                }
-                            }
-                        }
-                    } catch (LoginException le) {
-                        log.error("Failed to refresh TGT: refresh thread 
exiting now.", le);
-                        return;
-                    }
-                }
-            }
-        }, true);
-    }
-
-    public void startThreadIfNeeded() {
-        // thread object 't' will be null if a refresh thread is not needed.
-        if (t != null) {
-            t.start();
-        }
-    }
-
-    public void shutdown() {
-        if ((t != null) && (t.isAlive())) {
-            t.interrupt();
-            try {
-                t.join();
-            } catch (InterruptedException e) {
-                log.warn("Error while waiting for Login thread to shutdown: " 
+ e, e);
-            }
-        }
-    }
-
-    public Subject subject() {
-        return subject;
-    }
-
-    private synchronized LoginContext login(final String loginContextName) 
throws LoginException {
-        String jaasConfigFile = 
System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
-        if (jaasConfigFile == null) {
-            log.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM 
+ "' is not set, using default JAAS configuration.");
-        }
-        AppConfigurationEntry[] configEntries = 
Configuration.getConfiguration().getAppConfigurationEntry(loginContextName);
-        if (configEntries == null) {
-            String errorMessage = "Could not find a '" + loginContextName + "' 
entry in the JAAS configuration. System property '" +
-                JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile 
== null ? "not set" : jaasConfigFile);
-            throw new IllegalArgumentException(errorMessage);
-        }
-
-        LoginContext loginContext = new LoginContext(loginContextName, 
callbackHandler);
-        loginContext.login();
-        log.info("Successfully logged in.");
-        return loginContext;
-    }
-
-    private long getRefreshTime(KerberosTicket tgt) {
-        long start = tgt.getStartTime().getTime();
-        long expires = tgt.getEndTime().getTime();
-        log.info("TGT valid starting at: {}", tgt.getStartTime());
-        log.info("TGT expires: {}", tgt.getEndTime());
-        long proposedRefresh = start + (long) ((expires - start) *
-                (ticketRenewWindowFactor + (ticketRenewJitter * 
RNG.nextDouble())));
-
-        if (proposedRefresh > expires)
-            // proposedRefresh is too far in the future: it's after ticket 
expires: simply return now.
-            return currentWallTime();
-        else
-            return proposedRefresh;
-    }
-
-    private synchronized KerberosTicket getTGT() {
-        Set<KerberosTicket> tickets = 
subject.getPrivateCredentials(KerberosTicket.class);
-        for (KerberosTicket ticket : tickets) {
-            KerberosPrincipal server = ticket.getServer();
-            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + 
server.getRealm())) {
-                log.debug("Found TGT {}.", ticket);
-                return ticket;
-            }
-        }
-        return null;
-    }
-
-    private boolean hasSufficientTimeElapsed() {
-        long now = currentElapsedTime();
-        if (now - lastLogin < minTimeBeforeRelogin) {
-            log.warn("Not attempting to re-login since the last re-login was 
attempted less than {} seconds before.",
-                    minTimeBeforeRelogin / 1000);
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * Re-login a principal. This method assumes that {@link #login(String)} 
has happened already.
-     * @throws javax.security.auth.login.LoginException on a failure
-     */
-    private synchronized void reLogin() throws LoginException {
-        if (!isKrbTicket) {
-            return;
-        }
-        if (login == null) {
-            throw new LoginException("Login must be done first");
-        }
-        if (!hasSufficientTimeElapsed()) {
-            return;
-        }
-        log.info("Initiating logout for {}", principal);
-        synchronized (Login.class) {
-            // register most recent relogin attempt
-            lastLogin = currentElapsedTime();
-            //clear up the kerberos state. But the tokens are not cleared! As 
per
-            //the Java kerberos login module code, only the kerberos 
credentials
-            //are cleared
-            login.logout();
-            //login and also update the subject field of this instance to
-            //have the new credentials (pass it to the LoginContext 
constructor)
-            login = new LoginContext(loginContextName, subject);
-            log.info("Initiating re-login for {}", principal);
-            login.login();
-        }
-    }
-
-    private long currentElapsedTime() {
-        return time.nanoseconds() / 1000000;
-    }
-
-    private long currentWallTime() {
-        return time.milliseconds();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
 
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
deleted file mode 100644
index e163ba8..0000000
--- 
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.security.kerberos;
-
-import javax.security.auth.Subject;
-import javax.security.auth.login.LoginException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.Map;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.LoginType;
-import org.apache.kafka.common.security.JaasUtils;
-
-public class LoginManager {
-
-    private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = 
new EnumMap<>(LoginType.class);
-
-    private final Login login;
-    private final String serviceName;
-    private final LoginType loginType;
-    private int refCount;
-
-    private LoginManager(LoginType loginType, Map<String, ?> configs) throws 
IOException, LoginException {
-        this.loginType = loginType;
-        String loginContext = loginType.contextName();
-        login = new Login(loginContext, configs);
-        this.serviceName = getServiceName(loginContext, configs);
-        login.startThreadIfNeeded();
-    }
-
-    private static String getServiceName(String loginContext, Map<String, ?> 
configs) throws IOException {
-        String jaasServiceName = JaasUtils.jaasConfig(loginContext, 
JaasUtils.SERVICE_NAME);
-        String configServiceName = (String) 
configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
-        if (jaasServiceName != null && configServiceName != null && 
!jaasServiceName.equals(configServiceName)) {
-            String message = "Conflicting serviceName values found in JAAS and 
Kafka configs " +
-                "value in JAAS file " + jaasServiceName + ", value in Kafka 
config " + configServiceName;
-            throw new IllegalArgumentException(message);
-        }
-
-        if (jaasServiceName != null)
-            return jaasServiceName;
-        if (configServiceName != null)
-            return configServiceName;
-
-        throw new IllegalArgumentException("No serviceName defined in either 
JAAS or Kafka config");
-    }
-
-    /**
-     * Returns an instance of `LoginManager` and increases its reference count.
-     *
-     * `release()` should be invoked when the `LoginManager` is no longer 
needed. This method will try to reuse an
-     * existing `LoginManager` for the provided `mode` if available. However, 
it expects `configs` to be the same for
-     * every invocation and it will ignore them in the case where it's 
returning a cached instance of `LoginManager`.
-     *
-     * This is a bit ugly and it would be nicer if we could pass the 
`LoginManager` to `ChannelBuilders.create` and
-     * shut it down when the broker or clients are closed. It's 
straightforward to do the former, but it's more
-     * complicated to do the latter without making the consumer API more 
complex.
-     *
-     * @param loginType the type of the login context, it should be SERVER for 
the broker and CLIENT for the clients
-     *                  (i.e. consumer and producer)
-     * @param configs configuration as key/value pairs
-     */
-    public static final LoginManager acquireLoginManager(LoginType loginType, 
Map<String, ?> configs) throws IOException, LoginException {
-        synchronized (LoginManager.class) {
-            LoginManager loginManager = CACHED_INSTANCES.get(loginType);
-            if (loginManager == null) {
-                loginManager = new LoginManager(loginType, configs);
-                CACHED_INSTANCES.put(loginType, loginManager);
-            }
-            return loginManager.acquire();
-        }
-    }
-
-    public Subject subject() {
-        return login.subject();
-    }
-
-    public String serviceName() {
-        return serviceName;
-    }
-
-    private LoginManager acquire() {
-        ++refCount;
-        return this;
-    }
-
-    /**
-     * Decrease the reference count for this instance and release resources if 
it reaches 0.
-     */
-    public void release() {
-        synchronized (LoginManager.class) {
-            if (refCount == 0)
-                throw new IllegalStateException("release called on 
LoginManager with refCount == 0");
-            else if (refCount == 1) {
-                CACHED_INSTANCES.remove(loginType);
-                login.shutdown();
-            }
-            --refCount;
-        }
-    }
-
-    /* Should only be used in tests. */
-    public static void closeAll() {
-        synchronized (LoginManager.class) {
-            for (LoginType loginType : new 
ArrayList<>(CACHED_INSTANCES.keySet())) {
-                LoginManager loginManager = CACHED_INSTANCES.remove(loginType);
-                loginManager.login.shutdown();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
new file mode 100644
index 0000000..f06fbf6
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.plain;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+public class PlainLoginModule implements LoginModule {
+
+    private static final String USERNAME_CONFIG = "username";
+    private static final String PASSWORD_CONFIG = "password";
+
+    static {
+        PlainSaslServerProvider.initialize();
+    }
+
+    @Override
+    public void initialize(Subject subject, CallbackHandler callbackHandler, 
Map<String, ?> sharedState, Map<String, ?> options) {
+        String username = (String) options.get(USERNAME_CONFIG);
+        if (username != null)
+            subject.getPublicCredentials().add(username);
+        String password = (String) options.get(PASSWORD_CONFIG);
+        if (password != null)
+            subject.getPrivateCredentials().add(password);
+    }
+
+    @Override
+    public boolean login() throws LoginException {
+        return true;
+    }
+
+    @Override
+    public boolean logout() throws LoginException {
+        return true;
+    }
+
+    @Override
+    public boolean commit() throws LoginException {
+        return true;
+    }
+
+    @Override
+    public boolean abort() throws LoginException {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
new file mode 100644
index 0000000..5c6fd78
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.plain;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Map;
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import javax.security.sasl.SaslServerFactory;
+
+import org.apache.kafka.common.network.LoginType;
+import org.apache.kafka.common.security.JaasUtils;
+
+/**
+ * Simple SaslServer implementation for SASL/PLAIN. In order to make this 
implementation
+ * fully pluggable, authentication of username/password is fully contained 
within the
+ * server implementation.
+ * <p>
+ * Valid users with passwords are specified in the Jaas configuration file. 
Each user
+ * is specified with user_<username> as key and <password> as value. This is 
consistent
+ * with Zookeeper Digest-MD5 implementation.
+ * <p>
+ * To avoid storing clear passwords on disk or to integrate with external 
authentication
+ * servers in production systems, this module can be replaced with a different 
implementation.
+ *
+ */
+public class PlainSaslServer implements SaslServer {
+
+    public static final String PLAIN_MECHANISM = "PLAIN";
+    private static final String JAAS_USER_PREFIX = "user_";
+
+    private boolean complete;
+    private String authorizationID;
+
+    public PlainSaslServer(CallbackHandler callbackHandler) {
+    }
+
+    @Override
+    public byte[] evaluateResponse(byte[] response) throws SaslException {
+        /*
+         * Message format (from https://tools.ietf.org/html/rfc4616):
+         *
+         * message   = [authzid] UTF8NUL authcid UTF8NUL passwd
+         * authcid   = 1*SAFE ; MUST accept up to 255 octets
+         * authzid   = 1*SAFE ; MUST accept up to 255 octets
+         * passwd    = 1*SAFE ; MUST accept up to 255 octets
+         * UTF8NUL   = %x00 ; UTF-8 encoded NUL character
+         *
+         * SAFE      = UTF1 / UTF2 / UTF3 / UTF4
+         *                ;; any UTF-8 encoded Unicode character except NUL
+         */
+
+        String[] tokens;
+        try {
+            tokens = new String(response, "UTF-8").split("\u0000");
+        } catch (UnsupportedEncodingException e) {
+            throw new SaslException("UTF-8 encoding not supported", e);
+        }
+        if (tokens.length != 3)
+            throw new SaslException("Invalid SASL/PLAIN response: expected 3 
tokens, got " + tokens.length);
+        authorizationID = tokens[0];
+        String username = tokens[1];
+        String password = tokens[2];
+
+        if (username.isEmpty()) {
+            throw new SaslException("Authentication failed: username not 
specified");
+        }
+        if (password.isEmpty()) {
+            throw new SaslException("Authentication failed: password not 
specified");
+        }
+        if (authorizationID.isEmpty())
+            authorizationID = username;
+
+        try {
+            String expectedPassword = 
JaasUtils.jaasConfig(LoginType.SERVER.contextName(), JAAS_USER_PREFIX + 
username);
+            if (!password.equals(expectedPassword)) {
+                throw new SaslException("Authentication failed: Invalid 
username or password");
+            }
+        } catch (IOException e) {
+            throw new SaslException("Authentication failed: Invalid JAAS 
configuration", e);
+        }
+        complete = true;
+        return new byte[0];
+    }
+
+    @Override
+    public String getAuthorizationID() {
+        if (!complete)
+            throw new IllegalStateException("Authentication exchange has not 
completed");
+        return authorizationID;
+    }
+
+    @Override
+    public String getMechanismName() {
+        return PLAIN_MECHANISM;
+    }
+
+    @Override
+    public Object getNegotiatedProperty(String propName) {
+        if (!complete)
+            throw new IllegalStateException("Authentication exchange has not 
completed");
+        return null;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return complete;
+    }
+
+    @Override
+    public byte[] unwrap(byte[] incoming, int offset, int len) throws 
SaslException {
+        if (!complete)
+            throw new IllegalStateException("Authentication exchange has not 
completed");
+        return Arrays.copyOfRange(incoming, offset, offset + len);
+    }
+
+    @Override
+    public byte[] wrap(byte[] outgoing, int offset, int len) throws 
SaslException {
+        if (!complete)
+            throw new IllegalStateException("Authentication exchange has not 
completed");
+        return Arrays.copyOfRange(outgoing, offset, offset + len);
+    }
+
+    @Override
+    public void dispose() throws SaslException {
+    }
+
+    public static class PlainSaslServerFactory implements SaslServerFactory {
+
+        @Override
+        public SaslServer createSaslServer(String mechanism, String protocol, 
String serverName, Map<String, ?> props, CallbackHandler cbh)
+            throws SaslException {
+
+            if (!PLAIN_MECHANISM.equals(mechanism)) {
+                throw new SaslException(String.format("Mechanism \'%s\' is not 
supported. Only PLAIN is supported.", mechanism));
+            }
+            return new PlainSaslServer(cbh);
+        }
+
+        @Override
+        public String[] getMechanismNames(Map<String, ?> props) {
+            String noPlainText = (String) props.get(Sasl.POLICY_NOPLAINTEXT);
+            if ("true".equals(noPlainText))
+                return new String[]{};
+            else
+                return new String[]{PLAIN_MECHANISM};
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
new file mode 100644
index 0000000..c3db1f5
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.security.plain;
+
+import java.security.Provider;
+import java.security.Security;
+
+import 
org.apache.kafka.common.security.plain.PlainSaslServer.PlainSaslServerFactory;
+
+public class PlainSaslServerProvider extends Provider {
+
+    private static final long serialVersionUID = 1L;
+
+    protected PlainSaslServerProvider() {
+        super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN 
Server Provider for Kafka");
+        super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, 
PlainSaslServerFactory.class.getName());
+    }
+
+    public static void initialize() {
+        Security.addProvider(new PlainSaslServerProvider());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9def557..92f3101 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -91,7 +91,10 @@ public class RequestResponseTest {
                 createUpdateMetadataResponse(),
                 createLeaderAndIsrRequest(),
                 createLeaderAndIsrRequest().getErrorResponse(0, new 
UnknownServerException()),
-                createLeaderAndIsrResponse()
+                createLeaderAndIsrResponse(),
+                createSaslHandshakeRequest(),
+                createSaslHandshakeRequest().getErrorResponse(0, new 
UnknownServerException()),
+                createSaslHandshakeResponse()
         );
 
         for (AbstractRequestResponse req : requestResponseList)
@@ -425,5 +428,11 @@ public class RequestResponseTest {
         return new UpdateMetadataResponse(Errors.NONE.code());
     }
 
+    private AbstractRequest createSaslHandshakeRequest() {
+        return new SaslHandshakeRequest("PLAIN");
+    }
 
+    private AbstractRequestResponse createSaslHandshakeResponse() {
+        return new SaslHandshakeResponse(Errors.NONE.code(), 
Collections.singletonList("GSSAPI"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
index fbe2630..91e921f 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java
@@ -210,6 +210,7 @@ public class TestSslUtils {
             X509Certificate cCert = generateCertificate("CN=" + host + ", O=A 
client", cKP, 30, "SHA1withRSA");
             createKeyStore(keyStoreFile.getPath(), password, "client", 
cKP.getPrivate(), cCert);
             certs.put(certAlias, cCert);
+            keyStoreFile.deleteOnExit();
         } else if (mode == Mode.SERVER) {
             keyStoreFile = File.createTempFile("serverKS", ".jks");
             KeyPair sKP = generateKeyPair("RSA");
@@ -217,10 +218,12 @@ public class TestSslUtils {
                                                         "SHA1withRSA");
             createKeyStore(keyStoreFile.getPath(), password, password, 
"server", sKP.getPrivate(), sCert);
             certs.put(certAlias, sCert);
+            keyStoreFile.deleteOnExit();
         }
 
         if (trustStore) {
             createTrustStore(trustStoreFile.getPath(), trustStorePassword, 
certs);
+            trustStoreFile.deleteOnExit();
         }
 
         return createSslConfig(mode, keyStoreFile, password, password, 
trustStoreFile, trustStorePassword);

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala 
b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 9708c4e..91a1d75 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -63,6 +63,8 @@ object ErrorMapping {
   val GroupAuthorizationCode: Short = 30
   val ClusterAuthorizationCode: Short = 31
   // 32: INVALID_TIMESTAMP
+  // 33: UNSUPPORTED_SASL_MECHANISM
+  // 34: ILLEGAL_SASL_STATE
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e9731fd..018946e 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -89,6 +89,14 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
     val brokerEndPoint = 
broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
     val brokerNode = new Node(broker.id, brokerEndPoint.host, 
brokerEndPoint.port)
     val networkClient = {
+      val channelBuilder = ChannelBuilders.create(
+        config.interBrokerSecurityProtocol,
+        Mode.CLIENT,
+        LoginType.SERVER,
+        config.values,
+        config.saslMechanismInterBrokerProtocol,
+        config.saslInterBrokerHandshakeRequestEnable
+      )
       val selector = new Selector(
         NetworkReceive.UNLIMITED,
         config.connectionsMaxIdleMs,
@@ -97,7 +105,7 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
         "controller-channel",
         Map("broker-id" -> broker.id.toString).asJava,
         false,
-        ChannelBuilders.create(config.interBrokerSecurityProtocol, 
Mode.CLIENT, LoginType.SERVER, config.values)
+        channelBuilder
       )
       new NetworkClient(
         selector,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index f1ec2ef..b757abd 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -399,7 +399,7 @@ private[kafka] class Processor(val id: Int,
     "socket-server",
     metricTags,
     false,
-    ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, 
channelConfigs))
+    ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, 
channelConfigs, null, true))
 
   override def run() {
     startupComplete()

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4f77d30..9afefa5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -49,6 +49,7 @@ import org.apache.kafka.common.internals.TopicConstants
 
 import scala.collection._
 import scala.collection.JavaConverters._
+import org.apache.kafka.common.requests.SaslHandshakeResponse
 
 /**
  * Logic to handle the various Kafka requests
@@ -93,6 +94,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
         case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
+        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + 
requestId)
       }
     } catch {
@@ -993,6 +995,12 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleSaslHandshakeRequest(request: RequestChannel.Request) {
+    val respHeader = new ResponseHeader(request.header.correlationId)
+    val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE.code, 
config.saslEnabledMechanisms)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new 
ResponseSend(request.connectionId, respHeader, response)))
+  }
+
   def close() {
     quotaManagers.foreach { case (apiKey, quotaManager) =>
       quotaManager.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 5c0d27a..5e28bd7 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.util.Properties
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0}
 import kafka.cluster.EndPoint
 import kafka.consumer.ConsumerConfig
 import kafka.coordinator.OffsetConfig
@@ -175,6 +175,8 @@ object Defaults {
   val SslClientAuth = SslClientAuthNone
 
   /** ********* Sasl configuration ***********/
+  val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
+  val SaslEnabledMechanisms = SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS
   val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD
   val SaslKerberosTicketRenewWindowFactor = 
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
   val SaslKerberosTicketRenewJitter = 
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
@@ -337,6 +339,8 @@ object KafkaConfig {
   val SslClientAuthProp = SslConfigs.SSL_CLIENT_AUTH_CONFIG
 
   /** ********* SASL Configuration ****************/
+  val SaslMechanismInterBrokerProtocolProp = 
"sasl.mechanism.inter.broker.protocol"
+  val SaslEnabledMechanismsProp = SaslConfigs.SASL_ENABLED_MECHANISMS
   val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME
   val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD
   val SaslKerberosTicketRenewWindowFactorProp = 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
@@ -533,6 +537,8 @@ object KafkaConfig {
   val SslClientAuthDoc = SslConfigs.SSL_CLIENT_AUTH_DOC
 
   /** ********* Sasl Configuration ****************/
+  val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for 
inter-broker communication. Default is GSSAPI."
+  val SaslEnabledMechanismsDoc = SaslConfigs.SASL_ENABLED_MECHANISMS_DOC
   val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC
   val SaslKerberosKinitCmdDoc = SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC
   val SaslKerberosTicketRenewWindowFactorDoc = 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC
@@ -704,6 +710,8 @@ object KafkaConfig {
       .define(SslCipherSuitesProp, LIST, null, MEDIUM, SslCipherSuitesDoc)
 
       /** ********* Sasl Configuration ****************/
+      .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
+      .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, 
MEDIUM, SaslEnabledMechanismsDoc)
       .define(SaslKerberosServiceNameProp, STRING, null, MEDIUM, 
SaslKerberosServiceNameDoc)
       .define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, 
MEDIUM, SaslKerberosKinitCmdDoc)
       .define(SaslKerberosTicketRenewWindowFactorProp, DOUBLE, 
Defaults.SaslKerberosTicketRenewWindowFactor, MEDIUM, 
SaslKerberosTicketRenewWindowFactorDoc)
@@ -894,12 +902,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   val sslCipher = getList(KafkaConfig.SslCipherSuitesProp)
 
   /** ********* Sasl Configuration **************/
+  val saslMechanismInterBrokerProtocol = 
getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
+  val saslEnabledMechanisms = getList(KafkaConfig.SaslEnabledMechanismsProp)
   val saslKerberosServiceName = 
getString(KafkaConfig.SaslKerberosServiceNameProp)
   val saslKerberosKinitCmd = getString(KafkaConfig.SaslKerberosKinitCmdProp)
   val saslKerberosTicketRenewWindowFactor = 
getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp)
   val saslKerberosTicketRenewJitter = 
getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp)
   val saslKerberosMinTimeBeforeRelogin = 
getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp)
   val saslKerberosPrincipalToLocalRules = 
getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
+  val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= 
KAFKA_0_10_0_IV0
 
   /** ********* Quota Configuration **************/
   val producerQuotaBytesPerSecondDefault = 
getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
@@ -1009,5 +1020,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
     )
     require(interBrokerProtocolVersion >= logMessageFormatVersion,
       s"log.message.format.version $logMessageFormatVersionString cannot be 
used when inter.broker.protocol.version is set to 
$interBrokerProtocolVersionString")
+    val interBrokerUsesSasl = interBrokerSecurityProtocol == 
SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == 
SecurityProtocol.SASL_SSL
+    require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || 
saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
+      s"Only GSSAPI mechanism is supported for inter-broker communication with 
SASL when inter.broker.protocol.version is set to 
$interBrokerProtocolVersionString")
+    require(!interBrokerUsesSasl || 
saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol),
+      s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included 
in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker 
communication")
   }
 }

Reply via email to