http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 1f925f9..89c6e6c 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -18,11 +18,12 @@ package org.apache.kafka.common.security.authenticator; import java.io.IOException; +import java.util.HashSet; +import java.util.List; import java.util.Map; - +import java.util.Set; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; - import java.security.Principal; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; @@ -44,51 +45,102 @@ import org.ietf.jgss.GSSName; import org.ietf.jgss.Oid; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.TransportLayer; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractRequestResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.requests.ResponseSend; +import org.apache.kafka.common.requests.SaslHandshakeRequest; +import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.security.auth.PrincipalBuilder; public class SaslServerAuthenticator implements Authenticator { private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class); - private final SaslServer saslServer; - private final Subject subject; + public enum SaslState { + HANDSHAKE_REQUEST, AUTHENTICATE, COMPLETE, FAILED + } + private final String node; + private final Subject subject; private final KerberosShortNamer kerberosNamer; private final int maxReceiveSize; + private final String host; + + // Current SASL state + private SaslState saslState = SaslState.HANDSHAKE_REQUEST; + // Next SASL state to be set when outgoing writes associated with the current SASL state complete + private SaslState pendingSaslState = null; + private SaslServer saslServer; + private String saslMechanism; + private AuthCallbackHandler callbackHandler; // assigned in `configure` private TransportLayer transportLayer; + private Set<String> enabledMechanisms; + private Map<String, ?> configs; // buffers used in `authenticate` private NetworkReceive netInBuffer; private NetworkSend netOutBuffer; - public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, int maxReceiveSize) throws IOException { + public SaslServerAuthenticator(String node, final Subject subject, KerberosShortNamer kerberosNameParser, String host, int maxReceiveSize) throws IOException { if (subject == null) throw new IllegalArgumentException("subject cannot be null"); - if (subject.getPrincipals().isEmpty()) - throw new IllegalArgumentException("subject must have at least one principal"); this.node = node; this.subject = subject; this.kerberosNamer = kerberosNameParser; this.maxReceiveSize = maxReceiveSize; - saslServer = createSaslServer(); + this.host = host; } public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) { this.transportLayer = transportLayer; + this.configs = configs; + List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS); + if (enabledMechanisms == null || enabledMechanisms.isEmpty()) + throw new IllegalArgumentException("No SASL mechanisms are enabled"); + this.enabledMechanisms = new HashSet<>(enabledMechanisms); } - private SaslServer createSaslServer() throws IOException { + private void createSaslServer(String mechanism) throws IOException { + this.saslMechanism = mechanism; + callbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration(), kerberosNamer); + callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism); + if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) { + if (subject.getPrincipals().isEmpty()) + throw new IllegalArgumentException("subject must have at least one principal"); + saslServer = createSaslKerberosServer(callbackHandler, configs); + } else { + try { + saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() { + public SaslServer run() throws SaslException { + return Sasl.createSaslServer(saslMechanism, "kafka", host, configs, callbackHandler); + } + }); + } catch (PrivilegedActionException e) { + throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause()); + } + } + } + + private SaslServer createSaslKerberosServer(final AuthCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs) throws IOException { // server is using a JAAS-authenticated subject: determine service principal name and hostname from kafka server's subject. - final SaslServerCallbackHandler saslServerCallbackHandler = new SaslServerCallbackHandler( - Configuration.getConfiguration(), kerberosNamer); final Principal servicePrincipal = subject.getPrincipals().iterator().next(); KerberosName kerberosName; try { @@ -99,9 +151,7 @@ public class SaslServerAuthenticator implements Authenticator { final String servicePrincipalName = kerberosName.serviceName(); final String serviceHostname = kerberosName.hostName(); - final String mech = "GSSAPI"; - - LOG.debug("Creating SaslServer for {} with mechanism {}", kerberosName, mech); + LOG.debug("Creating SaslServer for {} with mechanism {}", kerberosName, saslMechanism); // As described in http://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/jgss-features.html: // "To enable Java GSS to delegate to the native GSS library and its list of native mechanisms, @@ -127,7 +177,7 @@ public class SaslServerAuthenticator implements Authenticator { try { return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() { public SaslServer run() throws SaslException { - return Sasl.createSaslServer(mech, servicePrincipalName, serviceHostname, null, saslServerCallbackHandler); + return Sasl.createSaslServer(saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler); } }); } catch (PrivilegedActionException e) { @@ -146,8 +196,8 @@ public class SaslServerAuthenticator implements Authenticator { if (netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps()) return; - if (saslServer.isComplete()) { - transportLayer.removeInterestOps(SelectionKey.OP_WRITE); + if (saslServer != null && saslServer.isComplete()) { + setSaslState(SaslState.COMPLETE); return; } @@ -161,12 +211,28 @@ public class SaslServerAuthenticator implements Authenticator { netInBuffer.payload().get(clientToken, 0, clientToken.length); netInBuffer = null; // reset the networkReceive as we read all the data. try { - byte[] response = saslServer.evaluateResponse(clientToken); - if (response != null) { - netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(response)); - flushNetOutBufferAndUpdateInterestOps(); + switch (saslState) { + case HANDSHAKE_REQUEST: + if (handleKafkaRequest(clientToken)) + break; + // For default GSSAPI, fall through to authenticate using the client token as the first GSSAPI packet. + // This is required for interoperability with 0.9.0.x clients which do not send handshake request + case AUTHENTICATE: + byte[] response = saslServer.evaluateResponse(clientToken); + if (response != null) { + netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(response)); + flushNetOutBufferAndUpdateInterestOps(); + } + // When the authentication exchange is complete and no more tokens are expected from the client, + // update SASL state. Current SASL state will be updated when outgoing writes to the client complete. + if (saslServer.isComplete()) + setSaslState(SaslState.COMPLETE); + break; + default: + break; } } catch (Exception e) { + setSaslState(SaslState.FAILED); throw new IOException(e); } } @@ -177,18 +243,33 @@ public class SaslServerAuthenticator implements Authenticator { } public boolean complete() { - return saslServer.isComplete(); + return saslState == SaslState.COMPLETE; } public void close() throws IOException { - saslServer.dispose(); + if (saslServer != null) + saslServer.dispose(); + if (callbackHandler != null) + callbackHandler.close(); + } + + private void setSaslState(SaslState saslState) { + if (netOutBuffer != null && !netOutBuffer.completed()) + pendingSaslState = saslState; + else { + this.pendingSaslState = null; + this.saslState = saslState; + LOG.debug("Set SASL server state to {}", saslState); + } } private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException { boolean flushedCompletely = flushNetOutBuffer(); - if (flushedCompletely) + if (flushedCompletely) { transportLayer.removeInterestOps(SelectionKey.OP_WRITE); - else + if (pendingSaslState != null) + setSaslState(pendingSaslState); + } else transportLayer.addInterestOps(SelectionKey.OP_WRITE); return flushedCompletely; } @@ -198,4 +279,66 @@ public class SaslServerAuthenticator implements Authenticator { netOutBuffer.writeTo(transportLayer); return netOutBuffer.completed(); } + + private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException { + boolean isKafkaRequest = false; + String clientMechanism = null; + try { + ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes); + RequestHeader requestHeader = RequestHeader.parse(requestBuffer); + AbstractRequest request = AbstractRequest.getRequest(requestHeader.apiKey(), requestHeader.apiVersion(), requestBuffer); + isKafkaRequest = true; + + ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); + switch (apiKey) { + case SASL_HANDSHAKE: + clientMechanism = handleHandshakeRequest(requestHeader, (SaslHandshakeRequest) request); + break; + default: + throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake."); + } + } catch (SchemaException | IllegalArgumentException e) { + // SchemaException is thrown if the request is not in Kafka format. IIlegalArgumentException is thrown + // if the API key is invalid. For compatibility with 0.9.0.x where the first packet is a GSSAPI token + // starting with 0x60, revert to GSSAPI for both these exceptions. + if (LOG.isDebugEnabled()) { + StringBuilder tokenBuilder = new StringBuilder(); + for (byte b : requestBytes) { + tokenBuilder.append(String.format("%02x", b)); + if (tokenBuilder.length() >= 20) + break; + } + LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", requestBytes.length, tokenBuilder); + } + if (enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM)) { + LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI"); + clientMechanism = SaslConfigs.GSSAPI_MECHANISM; + } else + throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e); + } + if (clientMechanism != null) { + createSaslServer(clientMechanism); + setSaslState(SaslState.AUTHENTICATE); + } + return isKafkaRequest; + } + + private String handleHandshakeRequest(RequestHeader requestHeader, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException { + String clientMechanism = handshakeRequest.mechanism(); + if (enabledMechanisms.contains(clientMechanism)) { + LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism); + sendKafkaResponse(requestHeader, new SaslHandshakeResponse((short) 0, enabledMechanisms)); + return clientMechanism; + } else { + LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism); + sendKafkaResponse(requestHeader, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM.code(), enabledMechanisms)); + throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism); + } + } + + private void sendKafkaResponse(RequestHeader requestHeader, AbstractRequestResponse response) throws IOException { + ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId()); + netOutBuffer = new NetworkSend(node, ResponseSend.serialize(responseHeader, response.toStruct())); + flushNetOutBufferAndUpdateInterestOps(); + } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java index 1de4a2e..c23e390 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java @@ -19,12 +19,15 @@ package org.apache.kafka.common.security.authenticator; import java.io.IOException; +import java.util.Map; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; @@ -32,9 +35,16 @@ import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; import org.apache.kafka.common.security.kerberos.KerberosName; +import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.security.JaasUtils; -public class SaslServerCallbackHandler implements CallbackHandler { +/** + * Callback handler for Sasl servers. The callbacks required for all the SASL + * mechanisms enabled in the server should be supported by this callback handler. See + * <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/sasl/sasl-refguide.html">Java SASL API</a> + * for the list of SASL callback handlers required for each SASL mechanism. + */ +public class SaslServerCallbackHandler implements AuthCallbackHandler { private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class); private final KerberosShortNamer kerberosShortNamer; @@ -45,6 +55,11 @@ public class SaslServerCallbackHandler implements CallbackHandler { this.kerberosShortNamer = kerberosNameParser; } + @Override + public void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism) { + } + + @Override public void handle(Callback[] callbacks) throws UnsupportedCallbackException { for (Callback callback : callbacks) { if (callback instanceof RealmCallback) { @@ -78,4 +93,7 @@ public class SaslServerCallbackHandler implements CallbackHandler { } } + @Override + public void close() { + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java new file mode 100644 index 0000000..58becdf --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosLogin.java @@ -0,0 +1,392 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.kerberos; + +import javax.security.auth.kerberos.KerberosPrincipal; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import javax.security.auth.kerberos.KerberosTicket; +import javax.security.auth.Subject; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.security.authenticator.AbstractLogin; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.utils.Shell; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Date; +import java.util.Random; +import java.util.Set; +import java.util.Map; + +/** + * This class is responsible for refreshing Kerberos credentials for + * logins for both Kafka client and server. + */ +public class KerberosLogin extends AbstractLogin { + private static final Logger log = LoggerFactory.getLogger(KerberosLogin.class); + + private static final Random RNG = new Random(); + + private final Time time = new SystemTime(); + private Thread t; + private boolean isKrbTicket; + private boolean isUsingTicketCache; + + private String loginContextName; + private String principal; + + // LoginThread will sleep until 80% of time from last refresh to + // ticket's expiry has been reached, at which time it will wake + // and try to renew the ticket. + private double ticketRenewWindowFactor; + + /** + * Percentage of random jitter added to the renewal time + */ + private double ticketRenewJitter; + + // Regardless of ticketRenewWindowFactor setting above and the ticket expiry time, + // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute). + // Change the '1' to e.g. 5, to change this to 5 minutes. + private long minTimeBeforeRelogin; + + private String kinitCmd; + + private volatile Subject subject; + + private LoginContext loginContext; + private String serviceName; + private long lastLogin; + + /** + * Login constructor. The constructor starts the thread used + * to periodically re-login to the Kerberos Ticket Granting Server. + * @param loginContextName + * name of section in JAAS file that will be use to login. + * Passed as first param to javax.security.auth.login.LoginContext(). + * @param configs configure Login with the given key-value pairs. + * @throws javax.security.auth.login.LoginException + * Thrown if authentication fails. + */ + public void configure(Map<String, ?> configs, final String loginContextName) { + super.configure(configs, loginContextName); + this.loginContextName = loginContextName; + this.ticketRenewWindowFactor = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR); + this.ticketRenewJitter = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER); + this.minTimeBeforeRelogin = (Long) configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); + this.kinitCmd = (String) configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD); + this.serviceName = getServiceName(configs, loginContextName); + } + + @Override + public LoginContext login() throws LoginException { + + this.lastLogin = currentElapsedTime(); + loginContext = super.login(); + subject = loginContext.getSubject(); + isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty(); + + AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName); + if (entries.length == 0) { + isUsingTicketCache = false; + principal = null; + } else { + // there will only be a single entry + AppConfigurationEntry entry = entries[0]; + if (entry.getOptions().get("useTicketCache") != null) { + String val = (String) entry.getOptions().get("useTicketCache"); + isUsingTicketCache = val.equals("true"); + } else + isUsingTicketCache = false; + if (entry.getOptions().get("principal") != null) + principal = (String) entry.getOptions().get("principal"); + else + principal = null; + } + + if (!isKrbTicket) { + log.debug("It is not a Kerberos ticket"); + t = null; + // if no TGT, do not bother with ticket management. + return loginContext; + } + log.debug("It is a Kerberos ticket"); + + // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the + // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development, + // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running: + // "modprinc -maxlife 3mins <principal>" in kadmin. + t = Utils.newThread("kafka-kerberos-refresh-thread", new Runnable() { + public void run() { + log.info("TGT refresh thread started."); + while (true) { // renewal thread's main loop. if it exits from here, thread will exit. + KerberosTicket tgt = getTGT(); + long now = currentWallTime(); + long nextRefresh; + Date nextRefreshDate; + if (tgt == null) { + nextRefresh = now + minTimeBeforeRelogin; + nextRefreshDate = new Date(nextRefresh); + log.warn("No TGT found: will try again at {}", nextRefreshDate); + } else { + nextRefresh = getRefreshTime(tgt); + long expiry = tgt.getEndTime().getTime(); + Date expiryDate = new Date(expiry); + if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) { + log.error("The TGT cannot be renewed beyond the next expiry date: {}." + + "This process will not be able to authenticate new SASL connections after that " + + "time (for example, it will not be able to authenticate a new connection with a Kafka " + + "Broker). Ask your system administrator to either increase the " + + "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " + + "kadmin, or instead, to generate a keytab for {}. Because the TGT's " + + "expiry cannot be further extended by refreshing, exiting refresh thread now.", + expiryDate, principal, principal); + return; + } + // determine how long to sleep from looking at ticket's expiry. + // We should not allow the ticket to expire, but we should take into consideration + // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so + // would cause ticket expiration. + if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) { + // expiry is before next scheduled refresh). + log.info("Refreshing now because expiry is before next scheduled refresh time."); + nextRefresh = now; + } else { + if (nextRefresh < (now + minTimeBeforeRelogin)) { + // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN). + Date until = new Date(nextRefresh); + Date newUntil = new Date(now + minTimeBeforeRelogin); + log.warn("TGT refresh thread time adjusted from {} to {} since the former is sooner " + + "than the minimum refresh interval ({} seconds) from now.", + until, newUntil, minTimeBeforeRelogin / 1000); + } + nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin); + } + nextRefreshDate = new Date(nextRefresh); + if (nextRefresh > expiry) { + log.error("Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." + + "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.", + nextRefreshDate, expiryDate); + return; + } + } + if (now < nextRefresh) { + Date until = new Date(nextRefresh); + log.info("TGT refresh sleeping until: {}", until); + try { + Thread.sleep(nextRefresh - now); + } catch (InterruptedException ie) { + log.warn("TGT renewal thread has been interrupted and will exit."); + return; + } + } else { + log.error("NextRefresh: {} is in the past: exiting refresh thread. Check" + + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)." + + " Manual intervention will be required for this client to successfully authenticate." + + " Exiting refresh thread.", nextRefreshDate); + return; + } + if (isUsingTicketCache) { + String kinitArgs = "-R"; + int retry = 1; + while (retry >= 0) { + try { + log.debug("Running ticket cache refresh command: {} {}", kinitCmd, kinitArgs); + Shell.execCommand(kinitCmd, kinitArgs); + break; + } catch (Exception e) { + if (retry > 0) { + --retry; + // sleep for 10 seconds + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ie) { + log.error("Interrupted while renewing TGT, exiting Login thread"); + return; + } + } else { + log.warn("Could not renew TGT due to problem running shell command: '" + kinitCmd + + " " + kinitArgs + "'" + "; exception was: " + e + ". Exiting refresh thread.", e); + return; + } + } + } + } + try { + int retry = 1; + while (retry >= 0) { + try { + reLogin(); + break; + } catch (LoginException le) { + if (retry > 0) { + --retry; + // sleep for 10 seconds. + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + log.error("Interrupted during login retry after LoginException:", le); + throw le; + } + } else { + log.error("Could not refresh TGT for principal: " + principal + ".", le); + } + } + } + } catch (LoginException le) { + log.error("Failed to refresh TGT: refresh thread exiting now.", le); + return; + } + } + } + }, true); + t.start(); + return loginContext; + } + + @Override + public void close() { + if ((t != null) && (t.isAlive())) { + t.interrupt(); + try { + t.join(); + } catch (InterruptedException e) { + log.warn("Error while waiting for Login thread to shutdown: " + e, e); + } + } + } + + @Override + public Subject subject() { + return subject; + } + + @Override + public String serviceName() { + return serviceName; + } + + private String getServiceName(Map<String, ?> configs, String loginContext) { + String jaasServiceName; + try { + jaasServiceName = JaasUtils.jaasConfig(loginContext, JaasUtils.SERVICE_NAME); + } catch (IOException e) { + throw new KafkaException("Jaas configuration not found", e); + } + String configServiceName = (String) configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME); + if (jaasServiceName != null && configServiceName != null && !jaasServiceName.equals(configServiceName)) { + String message = "Conflicting serviceName values found in JAAS and Kafka configs " + + "value in JAAS file " + jaasServiceName + ", value in Kafka config " + configServiceName; + throw new IllegalArgumentException(message); + } + + if (jaasServiceName != null) + return jaasServiceName; + if (configServiceName != null) + return configServiceName; + + throw new IllegalArgumentException("No serviceName defined in either JAAS or Kafka config"); + } + + + private long getRefreshTime(KerberosTicket tgt) { + long start = tgt.getStartTime().getTime(); + long expires = tgt.getEndTime().getTime(); + log.info("TGT valid starting at: {}", tgt.getStartTime()); + log.info("TGT expires: {}", tgt.getEndTime()); + long proposedRefresh = start + (long) ((expires - start) * + (ticketRenewWindowFactor + (ticketRenewJitter * RNG.nextDouble()))); + + if (proposedRefresh > expires) + // proposedRefresh is too far in the future: it's after ticket expires: simply return now. + return currentWallTime(); + else + return proposedRefresh; + } + + private synchronized KerberosTicket getTGT() { + Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class); + for (KerberosTicket ticket : tickets) { + KerberosPrincipal server = ticket.getServer(); + if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) { + log.debug("Found TGT {}.", ticket); + return ticket; + } + } + return null; + } + + private boolean hasSufficientTimeElapsed() { + long now = currentElapsedTime(); + if (now - lastLogin < minTimeBeforeRelogin) { + log.warn("Not attempting to re-login since the last re-login was attempted less than {} seconds before.", + minTimeBeforeRelogin / 1000); + return false; + } + return true; + } + + /** + * Re-login a principal. This method assumes that {@link #login(String)} has happened already. + * @throws javax.security.auth.login.LoginException on a failure + */ + private synchronized void reLogin() throws LoginException { + if (!isKrbTicket) { + return; + } + if (loginContext == null) { + throw new LoginException("Login must be done first"); + } + if (!hasSufficientTimeElapsed()) { + return; + } + log.info("Initiating logout for {}", principal); + synchronized (KerberosLogin.class) { + // register most recent relogin attempt + lastLogin = currentElapsedTime(); + //clear up the kerberos state. But the tokens are not cleared! As per + //the Java kerberos login module code, only the kerberos credentials + //are cleared + loginContext.logout(); + //login and also update the subject field of this instance to + //have the new credentials (pass it to the LoginContext constructor) + loginContext = new LoginContext(loginContextName, subject); + log.info("Initiating re-login for {}", principal); + loginContext.login(); + } + } + + private long currentElapsedTime() { + return time.nanoseconds() / 1000000; + } + + private long currentWallTime() { + return time.milliseconds(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java deleted file mode 100644 index 2e1a056..0000000 --- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java +++ /dev/null @@ -1,379 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.common.security.kerberos; - -import javax.security.auth.kerberos.KerberosPrincipal; -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.Configuration; -import javax.security.auth.login.LoginContext; -import javax.security.auth.login.LoginException; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.kerberos.KerberosTicket; -import javax.security.auth.Subject; - -import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.ClientCallbackHandler; -import org.apache.kafka.common.security.JaasUtils; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.utils.Shell; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.SystemTime; - -import org.apache.kafka.common.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.Random; -import java.util.Set; -import java.util.Map; - -/** - * This class is responsible for refreshing Kerberos credentials for - * logins for both Kafka client and server. - */ -public class Login { - private static final Logger log = LoggerFactory.getLogger(Login.class); - - private static final Random RNG = new Random(); - - private final Thread t; - private final boolean isKrbTicket; - private final boolean isUsingTicketCache; - - private final String loginContextName; - private final String principal; - private final Time time = new SystemTime(); - private final CallbackHandler callbackHandler = new ClientCallbackHandler(); - - // LoginThread will sleep until 80% of time from last refresh to - // ticket's expiry has been reached, at which time it will wake - // and try to renew the ticket. - private final double ticketRenewWindowFactor; - - /** - * Percentage of random jitter added to the renewal time - */ - private final double ticketRenewJitter; - - // Regardless of ticketRenewWindowFactor setting above and the ticket expiry time, - // thread will not sleep between refresh attempts any less than 1 minute (60*1000 milliseconds = 1 minute). - // Change the '1' to e.g. 5, to change this to 5 minutes. - private final long minTimeBeforeRelogin; - - private final String kinitCmd; - - private volatile Subject subject; - - private LoginContext login; - private long lastLogin; - - /** - * Login constructor. The constructor starts the thread used - * to periodically re-login to the Kerberos Ticket Granting Server. - * @param loginContextName - * name of section in JAAS file that will be use to login. - * Passed as first param to javax.security.auth.login.LoginContext(). - * @param configs configure Login with the given key-value pairs. - * @throws javax.security.auth.login.LoginException - * Thrown if authentication fails. - */ - public Login(final String loginContextName, Map<String, ?> configs) throws LoginException { - this.loginContextName = loginContextName; - this.ticketRenewWindowFactor = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR); - this.ticketRenewJitter = (Double) configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER); - this.minTimeBeforeRelogin = (Long) configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN); - this.kinitCmd = (String) configs.get(SaslConfigs.SASL_KERBEROS_KINIT_CMD); - - this.lastLogin = currentElapsedTime(); - login = login(loginContextName); - subject = login.getSubject(); - isKrbTicket = !subject.getPrivateCredentials(KerberosTicket.class).isEmpty(); - - AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName); - if (entries.length == 0) { - isUsingTicketCache = false; - principal = null; - } else { - // there will only be a single entry - AppConfigurationEntry entry = entries[0]; - if (entry.getOptions().get("useTicketCache") != null) { - String val = (String) entry.getOptions().get("useTicketCache"); - isUsingTicketCache = val.equals("true"); - } else - isUsingTicketCache = false; - if (entry.getOptions().get("principal") != null) - principal = (String) entry.getOptions().get("principal"); - else - principal = null; - } - - if (!isKrbTicket) { - log.debug("It is not a Kerberos ticket"); - t = null; - // if no TGT, do not bother with ticket management. - return; - } - log.debug("It is a Kerberos ticket"); - - // Refresh the Ticket Granting Ticket (TGT) periodically. How often to refresh is determined by the - // TGT's existing expiry date and the configured minTimeBeforeRelogin. For testing and development, - // you can decrease the interval of expiration of tickets (for example, to 3 minutes) by running: - // "modprinc -maxlife 3mins <principal>" in kadmin. - t = Utils.newThread("kafka-kerberos-refresh-thread", new Runnable() { - public void run() { - log.info("TGT refresh thread started."); - while (true) { // renewal thread's main loop. if it exits from here, thread will exit. - KerberosTicket tgt = getTGT(); - long now = currentWallTime(); - long nextRefresh; - Date nextRefreshDate; - if (tgt == null) { - nextRefresh = now + minTimeBeforeRelogin; - nextRefreshDate = new Date(nextRefresh); - log.warn("No TGT found: will try again at {}", nextRefreshDate); - } else { - nextRefresh = getRefreshTime(tgt); - long expiry = tgt.getEndTime().getTime(); - Date expiryDate = new Date(expiry); - if (isUsingTicketCache && tgt.getRenewTill() != null && tgt.getRenewTill().getTime() < expiry) { - log.error("The TGT cannot be renewed beyond the next expiry date: {}." + - "This process will not be able to authenticate new SASL connections after that " + - "time (for example, it will not be able to authenticate a new connection with a Kafka " + - "Broker). Ask your system administrator to either increase the " + - "'renew until' time by doing : 'modprinc -maxrenewlife {} ' within " + - "kadmin, or instead, to generate a keytab for {}. Because the TGT's " + - "expiry cannot be further extended by refreshing, exiting refresh thread now.", - expiryDate, principal, principal); - return; - } - // determine how long to sleep from looking at ticket's expiry. - // We should not allow the ticket to expire, but we should take into consideration - // minTimeBeforeRelogin. Will not sleep less than minTimeBeforeRelogin, unless doing so - // would cause ticket expiration. - if ((nextRefresh > expiry) || (now + minTimeBeforeRelogin > expiry)) { - // expiry is before next scheduled refresh). - log.info("Refreshing now because expiry is before next scheduled refresh time."); - nextRefresh = now; - } else { - if (nextRefresh < (now + minTimeBeforeRelogin)) { - // next scheduled refresh is sooner than (now + MIN_TIME_BEFORE_LOGIN). - Date until = new Date(nextRefresh); - Date newUntil = new Date(now + minTimeBeforeRelogin); - log.warn("TGT refresh thread time adjusted from {} to {} since the former is sooner " + - "than the minimum refresh interval ({} seconds) from now.", - until, newUntil, minTimeBeforeRelogin / 1000); - } - nextRefresh = Math.max(nextRefresh, now + minTimeBeforeRelogin); - } - nextRefreshDate = new Date(nextRefresh); - if (nextRefresh > expiry) { - log.error("Next refresh: {} is later than expiry {}. This may indicate a clock skew problem." + - "Check that this host and the KDC hosts' clocks are in sync. Exiting refresh thread.", - nextRefreshDate, expiryDate); - return; - } - } - if (now < nextRefresh) { - Date until = new Date(nextRefresh); - log.info("TGT refresh sleeping until: {}", until); - try { - Thread.sleep(nextRefresh - now); - } catch (InterruptedException ie) { - log.warn("TGT renewal thread has been interrupted and will exit."); - return; - } - } else { - log.error("NextRefresh: {} is in the past: exiting refresh thread. Check" - + " clock sync between this host and KDC - (KDC's clock is likely ahead of this host)." - + " Manual intervention will be required for this client to successfully authenticate." - + " Exiting refresh thread.", nextRefreshDate); - return; - } - if (isUsingTicketCache) { - String kinitArgs = "-R"; - int retry = 1; - while (retry >= 0) { - try { - log.debug("Running ticket cache refresh command: {} {}", kinitCmd, kinitArgs); - Shell.execCommand(kinitCmd, kinitArgs); - break; - } catch (Exception e) { - if (retry > 0) { - --retry; - // sleep for 10 seconds - try { - Thread.sleep(10 * 1000); - } catch (InterruptedException ie) { - log.error("Interrupted while renewing TGT, exiting Login thread"); - return; - } - } else { - log.warn("Could not renew TGT due to problem running shell command: '" + kinitCmd - + " " + kinitArgs + "'" + "; exception was: " + e + ". Exiting refresh thread.", e); - return; - } - } - } - } - try { - int retry = 1; - while (retry >= 0) { - try { - reLogin(); - break; - } catch (LoginException le) { - if (retry > 0) { - --retry; - // sleep for 10 seconds. - try { - Thread.sleep(10 * 1000); - } catch (InterruptedException e) { - log.error("Interrupted during login retry after LoginException:", le); - throw le; - } - } else { - log.error("Could not refresh TGT for principal: " + principal + ".", le); - } - } - } - } catch (LoginException le) { - log.error("Failed to refresh TGT: refresh thread exiting now.", le); - return; - } - } - } - }, true); - } - - public void startThreadIfNeeded() { - // thread object 't' will be null if a refresh thread is not needed. - if (t != null) { - t.start(); - } - } - - public void shutdown() { - if ((t != null) && (t.isAlive())) { - t.interrupt(); - try { - t.join(); - } catch (InterruptedException e) { - log.warn("Error while waiting for Login thread to shutdown: " + e, e); - } - } - } - - public Subject subject() { - return subject; - } - - private synchronized LoginContext login(final String loginContextName) throws LoginException { - String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM); - if (jaasConfigFile == null) { - log.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is not set, using default JAAS configuration."); - } - AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName); - if (configEntries == null) { - String errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" + - JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile); - throw new IllegalArgumentException(errorMessage); - } - - LoginContext loginContext = new LoginContext(loginContextName, callbackHandler); - loginContext.login(); - log.info("Successfully logged in."); - return loginContext; - } - - private long getRefreshTime(KerberosTicket tgt) { - long start = tgt.getStartTime().getTime(); - long expires = tgt.getEndTime().getTime(); - log.info("TGT valid starting at: {}", tgt.getStartTime()); - log.info("TGT expires: {}", tgt.getEndTime()); - long proposedRefresh = start + (long) ((expires - start) * - (ticketRenewWindowFactor + (ticketRenewJitter * RNG.nextDouble()))); - - if (proposedRefresh > expires) - // proposedRefresh is too far in the future: it's after ticket expires: simply return now. - return currentWallTime(); - else - return proposedRefresh; - } - - private synchronized KerberosTicket getTGT() { - Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class); - for (KerberosTicket ticket : tickets) { - KerberosPrincipal server = ticket.getServer(); - if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) { - log.debug("Found TGT {}.", ticket); - return ticket; - } - } - return null; - } - - private boolean hasSufficientTimeElapsed() { - long now = currentElapsedTime(); - if (now - lastLogin < minTimeBeforeRelogin) { - log.warn("Not attempting to re-login since the last re-login was attempted less than {} seconds before.", - minTimeBeforeRelogin / 1000); - return false; - } - return true; - } - - /** - * Re-login a principal. This method assumes that {@link #login(String)} has happened already. - * @throws javax.security.auth.login.LoginException on a failure - */ - private synchronized void reLogin() throws LoginException { - if (!isKrbTicket) { - return; - } - if (login == null) { - throw new LoginException("Login must be done first"); - } - if (!hasSufficientTimeElapsed()) { - return; - } - log.info("Initiating logout for {}", principal); - synchronized (Login.class) { - // register most recent relogin attempt - lastLogin = currentElapsedTime(); - //clear up the kerberos state. But the tokens are not cleared! As per - //the Java kerberos login module code, only the kerberos credentials - //are cleared - login.logout(); - //login and also update the subject field of this instance to - //have the new credentials (pass it to the LoginContext constructor) - login = new LoginContext(loginContextName, subject); - log.info("Initiating re-login for {}", principal); - login.login(); - } - } - - private long currentElapsedTime() { - return time.nanoseconds() / 1000000; - } - - private long currentWallTime() { - return time.milliseconds(); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java deleted file mode 100644 index e163ba8..0000000 --- a/clients/src/main/java/org/apache/kafka/common/security/kerberos/LoginManager.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.common.security.kerberos; - -import javax.security.auth.Subject; -import javax.security.auth.login.LoginException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumMap; -import java.util.Map; - -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.network.LoginType; -import org.apache.kafka.common.security.JaasUtils; - -public class LoginManager { - - private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap<>(LoginType.class); - - private final Login login; - private final String serviceName; - private final LoginType loginType; - private int refCount; - - private LoginManager(LoginType loginType, Map<String, ?> configs) throws IOException, LoginException { - this.loginType = loginType; - String loginContext = loginType.contextName(); - login = new Login(loginContext, configs); - this.serviceName = getServiceName(loginContext, configs); - login.startThreadIfNeeded(); - } - - private static String getServiceName(String loginContext, Map<String, ?> configs) throws IOException { - String jaasServiceName = JaasUtils.jaasConfig(loginContext, JaasUtils.SERVICE_NAME); - String configServiceName = (String) configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME); - if (jaasServiceName != null && configServiceName != null && !jaasServiceName.equals(configServiceName)) { - String message = "Conflicting serviceName values found in JAAS and Kafka configs " + - "value in JAAS file " + jaasServiceName + ", value in Kafka config " + configServiceName; - throw new IllegalArgumentException(message); - } - - if (jaasServiceName != null) - return jaasServiceName; - if (configServiceName != null) - return configServiceName; - - throw new IllegalArgumentException("No serviceName defined in either JAAS or Kafka config"); - } - - /** - * Returns an instance of `LoginManager` and increases its reference count. - * - * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an - * existing `LoginManager` for the provided `mode` if available. However, it expects `configs` to be the same for - * every invocation and it will ignore them in the case where it's returning a cached instance of `LoginManager`. - * - * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and - * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more - * complicated to do the latter without making the consumer API more complex. - * - * @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients - * (i.e. consumer and producer) - * @param configs configuration as key/value pairs - */ - public static final LoginManager acquireLoginManager(LoginType loginType, Map<String, ?> configs) throws IOException, LoginException { - synchronized (LoginManager.class) { - LoginManager loginManager = CACHED_INSTANCES.get(loginType); - if (loginManager == null) { - loginManager = new LoginManager(loginType, configs); - CACHED_INSTANCES.put(loginType, loginManager); - } - return loginManager.acquire(); - } - } - - public Subject subject() { - return login.subject(); - } - - public String serviceName() { - return serviceName; - } - - private LoginManager acquire() { - ++refCount; - return this; - } - - /** - * Decrease the reference count for this instance and release resources if it reaches 0. - */ - public void release() { - synchronized (LoginManager.class) { - if (refCount == 0) - throw new IllegalStateException("release called on LoginManager with refCount == 0"); - else if (refCount == 1) { - CACHED_INSTANCES.remove(loginType); - login.shutdown(); - } - --refCount; - } - } - - /* Should only be used in tests. */ - public static void closeAll() { - synchronized (LoginManager.class) { - for (LoginType loginType : new ArrayList<>(CACHED_INSTANCES.keySet())) { - LoginManager loginManager = CACHED_INSTANCES.remove(loginType); - loginManager.login.shutdown(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java new file mode 100644 index 0000000..f06fbf6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainLoginModule.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.plain; + +import java.util.Map; + +import javax.security.auth.Subject; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.login.LoginException; +import javax.security.auth.spi.LoginModule; + +public class PlainLoginModule implements LoginModule { + + private static final String USERNAME_CONFIG = "username"; + private static final String PASSWORD_CONFIG = "password"; + + static { + PlainSaslServerProvider.initialize(); + } + + @Override + public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) { + String username = (String) options.get(USERNAME_CONFIG); + if (username != null) + subject.getPublicCredentials().add(username); + String password = (String) options.get(PASSWORD_CONFIG); + if (password != null) + subject.getPrivateCredentials().add(password); + } + + @Override + public boolean login() throws LoginException { + return true; + } + + @Override + public boolean logout() throws LoginException { + return true; + } + + @Override + public boolean commit() throws LoginException { + return true; + } + + @Override + public boolean abort() throws LoginException { + return false; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java new file mode 100644 index 0000000..5c6fd78 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.plain; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.Map; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslServerFactory; + +import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.security.JaasUtils; + +/** + * Simple SaslServer implementation for SASL/PLAIN. In order to make this implementation + * fully pluggable, authentication of username/password is fully contained within the + * server implementation. + * <p> + * Valid users with passwords are specified in the Jaas configuration file. Each user + * is specified with user_<username> as key and <password> as value. This is consistent + * with Zookeeper Digest-MD5 implementation. + * <p> + * To avoid storing clear passwords on disk or to integrate with external authentication + * servers in production systems, this module can be replaced with a different implementation. + * + */ +public class PlainSaslServer implements SaslServer { + + public static final String PLAIN_MECHANISM = "PLAIN"; + private static final String JAAS_USER_PREFIX = "user_"; + + private boolean complete; + private String authorizationID; + + public PlainSaslServer(CallbackHandler callbackHandler) { + } + + @Override + public byte[] evaluateResponse(byte[] response) throws SaslException { + /* + * Message format (from https://tools.ietf.org/html/rfc4616): + * + * message = [authzid] UTF8NUL authcid UTF8NUL passwd + * authcid = 1*SAFE ; MUST accept up to 255 octets + * authzid = 1*SAFE ; MUST accept up to 255 octets + * passwd = 1*SAFE ; MUST accept up to 255 octets + * UTF8NUL = %x00 ; UTF-8 encoded NUL character + * + * SAFE = UTF1 / UTF2 / UTF3 / UTF4 + * ;; any UTF-8 encoded Unicode character except NUL + */ + + String[] tokens; + try { + tokens = new String(response, "UTF-8").split("\u0000"); + } catch (UnsupportedEncodingException e) { + throw new SaslException("UTF-8 encoding not supported", e); + } + if (tokens.length != 3) + throw new SaslException("Invalid SASL/PLAIN response: expected 3 tokens, got " + tokens.length); + authorizationID = tokens[0]; + String username = tokens[1]; + String password = tokens[2]; + + if (username.isEmpty()) { + throw new SaslException("Authentication failed: username not specified"); + } + if (password.isEmpty()) { + throw new SaslException("Authentication failed: password not specified"); + } + if (authorizationID.isEmpty()) + authorizationID = username; + + try { + String expectedPassword = JaasUtils.jaasConfig(LoginType.SERVER.contextName(), JAAS_USER_PREFIX + username); + if (!password.equals(expectedPassword)) { + throw new SaslException("Authentication failed: Invalid username or password"); + } + } catch (IOException e) { + throw new SaslException("Authentication failed: Invalid JAAS configuration", e); + } + complete = true; + return new byte[0]; + } + + @Override + public String getAuthorizationID() { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return authorizationID; + } + + @Override + public String getMechanismName() { + return PLAIN_MECHANISM; + } + + @Override + public Object getNegotiatedProperty(String propName) { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return null; + } + + @Override + public boolean isComplete() { + return complete; + } + + @Override + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return Arrays.copyOfRange(incoming, offset, offset + len); + } + + @Override + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return Arrays.copyOfRange(outgoing, offset, offset + len); + } + + @Override + public void dispose() throws SaslException { + } + + public static class PlainSaslServerFactory implements SaslServerFactory { + + @Override + public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) + throws SaslException { + + if (!PLAIN_MECHANISM.equals(mechanism)) { + throw new SaslException(String.format("Mechanism \'%s\' is not supported. Only PLAIN is supported.", mechanism)); + } + return new PlainSaslServer(cbh); + } + + @Override + public String[] getMechanismNames(Map<String, ?> props) { + String noPlainText = (String) props.get(Sasl.POLICY_NOPLAINTEXT); + if ("true".equals(noPlainText)) + return new String[]{}; + else + return new String[]{PLAIN_MECHANISM}; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java new file mode 100644 index 0000000..c3db1f5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.plain; + +import java.security.Provider; +import java.security.Security; + +import org.apache.kafka.common.security.plain.PlainSaslServer.PlainSaslServerFactory; + +public class PlainSaslServerProvider extends Provider { + + private static final long serialVersionUID = 1L; + + protected PlainSaslServerProvider() { + super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka"); + super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName()); + } + + public static void initialize() { + Security.addProvider(new PlainSaslServerProvider()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 9def557..92f3101 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -91,7 +91,10 @@ public class RequestResponseTest { createUpdateMetadataResponse(), createLeaderAndIsrRequest(), createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()), - createLeaderAndIsrResponse() + createLeaderAndIsrResponse(), + createSaslHandshakeRequest(), + createSaslHandshakeRequest().getErrorResponse(0, new UnknownServerException()), + createSaslHandshakeResponse() ); for (AbstractRequestResponse req : requestResponseList) @@ -425,5 +428,11 @@ public class RequestResponseTest { return new UpdateMetadataResponse(Errors.NONE.code()); } + private AbstractRequest createSaslHandshakeRequest() { + return new SaslHandshakeRequest("PLAIN"); + } + private AbstractRequestResponse createSaslHandshakeResponse() { + return new SaslHandshakeResponse(Errors.NONE.code(), Collections.singletonList("GSSAPI")); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index fbe2630..91e921f 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -210,6 +210,7 @@ public class TestSslUtils { X509Certificate cCert = generateCertificate("CN=" + host + ", O=A client", cKP, 30, "SHA1withRSA"); createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert); certs.put(certAlias, cCert); + keyStoreFile.deleteOnExit(); } else if (mode == Mode.SERVER) { keyStoreFile = File.createTempFile("serverKS", ".jks"); KeyPair sKP = generateKeyPair("RSA"); @@ -217,10 +218,12 @@ public class TestSslUtils { "SHA1withRSA"); createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert); certs.put(certAlias, sCert); + keyStoreFile.deleteOnExit(); } if (trustStore) { createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs); + trustStoreFile.deleteOnExit(); } return createSslConfig(mode, keyStoreFile, password, password, trustStoreFile, trustStorePassword); http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/common/ErrorMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 9708c4e..91a1d75 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -63,6 +63,8 @@ object ErrorMapping { val GroupAuthorizationCode: Short = 30 val ClusterAuthorizationCode: Short = 31 // 32: INVALID_TIMESTAMP + // 33: UNSUPPORTED_SASL_MECHANISM + // 34: ILLEGAL_SASL_STATE private val exceptionToCode = Map[Class[Throwable], Short]( http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index e9731fd..018946e 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -89,6 +89,14 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port) val networkClient = { + val channelBuilder = ChannelBuilders.create( + config.interBrokerSecurityProtocol, + Mode.CLIENT, + LoginType.SERVER, + config.values, + config.saslMechanismInterBrokerProtocol, + config.saslInterBrokerHandshakeRequestEnable + ) val selector = new Selector( NetworkReceive.UNLIMITED, config.connectionsMaxIdleMs, @@ -97,7 +105,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf "controller-channel", Map("broker-id" -> broker.id.toString).asJava, false, - ChannelBuilders.create(config.interBrokerSecurityProtocol, Mode.CLIENT, LoginType.SERVER, config.values) + channelBuilder ) new NetworkClient( selector, http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index f1ec2ef..b757abd 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -399,7 +399,7 @@ private[kafka] class Processor(val id: Int, "socket-server", metricTags, false, - ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs)) + ChannelBuilders.create(protocol, Mode.SERVER, LoginType.SERVER, channelConfigs, null, true)) override def run() { startupComplete() http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4f77d30..9afefa5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -49,6 +49,7 @@ import org.apache.kafka.common.internals.TopicConstants import scala.collection._ import scala.collection.JavaConverters._ +import org.apache.kafka.common.requests.SaslHandshakeResponse /** * Logic to handle the various Kafka requests @@ -93,6 +94,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) + case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -993,6 +995,12 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleSaslHandshakeRequest(request: RequestChannel.Request) { + val respHeader = new ResponseHeader(request.header.correlationId) + val response = new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE.code, config.saslEnabledMechanisms) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response))) + } + def close() { quotaManagers.foreach { case (apiKey, quotaManager) => quotaManager.shutdown() http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5c0d27a..5e28bd7 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -19,7 +19,7 @@ package kafka.server import java.util.Properties -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0} import kafka.cluster.EndPoint import kafka.consumer.ConsumerConfig import kafka.coordinator.OffsetConfig @@ -175,6 +175,8 @@ object Defaults { val SslClientAuth = SslClientAuthNone /** ********* Sasl configuration ***********/ + val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM + val SaslEnabledMechanisms = SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD val SaslKerberosTicketRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER @@ -337,6 +339,8 @@ object KafkaConfig { val SslClientAuthProp = SslConfigs.SSL_CLIENT_AUTH_CONFIG /** ********* SASL Configuration ****************/ + val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol" + val SaslEnabledMechanismsProp = SaslConfigs.SASL_ENABLED_MECHANISMS val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD val SaslKerberosTicketRenewWindowFactorProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR @@ -533,6 +537,8 @@ object KafkaConfig { val SslClientAuthDoc = SslConfigs.SSL_CLIENT_AUTH_DOC /** ********* Sasl Configuration ****************/ + val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI." + val SaslEnabledMechanismsDoc = SaslConfigs.SASL_ENABLED_MECHANISMS_DOC val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC val SaslKerberosKinitCmdDoc = SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC val SaslKerberosTicketRenewWindowFactorDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC @@ -704,6 +710,8 @@ object KafkaConfig { .define(SslCipherSuitesProp, LIST, null, MEDIUM, SslCipherSuitesDoc) /** ********* Sasl Configuration ****************/ + .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc) + .define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, MEDIUM, SaslEnabledMechanismsDoc) .define(SaslKerberosServiceNameProp, STRING, null, MEDIUM, SaslKerberosServiceNameDoc) .define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, MEDIUM, SaslKerberosKinitCmdDoc) .define(SaslKerberosTicketRenewWindowFactorProp, DOUBLE, Defaults.SaslKerberosTicketRenewWindowFactor, MEDIUM, SaslKerberosTicketRenewWindowFactorDoc) @@ -894,12 +902,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val sslCipher = getList(KafkaConfig.SslCipherSuitesProp) /** ********* Sasl Configuration **************/ + val saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp) + val saslEnabledMechanisms = getList(KafkaConfig.SaslEnabledMechanismsProp) val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp) val saslKerberosKinitCmd = getString(KafkaConfig.SaslKerberosKinitCmdProp) val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp) val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp) val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp) val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp) + val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV0 /** ********* Quota Configuration **************/ val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp) @@ -1009,5 +1020,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra ) require(interBrokerProtocolVersion >= logMessageFormatVersion, s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") + val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL + require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM, + s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString") + require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol), + s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication") } }
