KAFKA-3149; Extend SASL implementation to support more mechanisms Code changes corresponding to KIP-43 to enable review of the KIP.
Author: Rajini Sivaram <[email protected]> Reviewers: Jun Rao <[email protected]>, Ismael Juma <[email protected]> Closes #812 from rajinisivaram/KAFKA-3149 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b375d7b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b375d7b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b375d7b Branch: refs/heads/trunk Commit: 5b375d7bf9b26aaeed06bac2dc5de3f8214cbad4 Parents: cd427c9 Author: Rajini Sivaram <[email protected]> Authored: Tue Apr 26 16:56:42 2016 -0700 Committer: Ismael Juma <[email protected]> Committed: Tue Apr 26 16:56:42 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 7 + .../org/apache/kafka/clients/ClientUtils.java | 4 +- .../org/apache/kafka/clients/NetworkClient.java | 20 +- .../apache/kafka/common/config/SaslConfigs.java | 15 +- .../common/errors/AuthenticationException.java | 27 ++ .../errors/IllegalSaslStateException.java | 27 ++ .../UnsupportedSaslMechanismException.java | 27 ++ .../kafka/common/network/ChannelBuilders.java | 14 +- .../common/network/SaslChannelBuilder.java | 36 +- .../apache/kafka/common/protocol/ApiKeys.java | 3 +- .../apache/kafka/common/protocol/Errors.java | 8 +- .../apache/kafka/common/protocol/Protocol.java | 13 + .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/ResponseSend.java | 2 +- .../common/requests/SaslHandshakeRequest.java | 83 ++++ .../common/requests/SaslHandshakeResponse.java | 85 ++++ .../security/auth/AuthCallbackHandler.java | 46 +++ .../kafka/common/security/auth/Login.java | 57 +++ .../security/authenticator/AbstractLogin.java | 108 +++++ .../security/authenticator/DefaultLogin.java | 32 ++ .../security/authenticator/LoginManager.java | 112 ++++++ .../authenticator/SaslClientAuthenticator.java | 242 ++++++++---- .../SaslClientCallbackHandler.java | 94 +++++ .../authenticator/SaslServerAuthenticator.java | 195 +++++++-- .../SaslServerCallbackHandler.java | 22 +- .../common/security/kerberos/KerberosLogin.java | 392 +++++++++++++++++++ .../kafka/common/security/kerberos/Login.java | 379 ------------------ .../common/security/kerberos/LoginManager.java | 130 ------ .../common/security/plain/PlainLoginModule.java | 66 ++++ .../common/security/plain/PlainSaslServer.java | 170 ++++++++ .../security/plain/PlainSaslServerProvider.java | 38 ++ .../common/requests/RequestResponseTest.java | 11 +- .../org/apache/kafka/test/TestSslUtils.java | 3 + .../main/scala/kafka/common/ErrorMapping.scala | 2 + .../controller/ControllerChannelManager.scala | 10 +- .../main/scala/kafka/network/SocketServer.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 8 + .../main/scala/kafka/server/KafkaConfig.scala | 18 +- .../main/scala/kafka/server/KafkaServer.scala | 9 +- .../kafka/server/ReplicaFetcherThread.scala | 10 +- .../kafka/api/BaseConsumerTest.scala | 9 +- .../kafka/api/BaseProducerSendTest.scala | 4 +- .../kafka/api/EndToEndAuthorizationTest.scala | 4 +- .../kafka/api/IntegrationTestHarness.scala | 8 +- .../kafka/api/PlaintextConsumerTest.scala | 2 +- .../api/SaslMultiMechanismConsumerTest.scala | 86 ++++ .../api/SaslPlainPlaintextConsumerTest.scala | 27 ++ .../scala/integration/kafka/api/SaslSetup.scala | 36 +- .../integration/kafka/api/SaslTestHarness.scala | 22 +- .../integration/KafkaServerTestHarness.scala | 4 +- .../unit/kafka/server/KafkaConfigTest.scala | 2 + .../scala/unit/kafka/utils/JaasTestUtils.scala | 122 ++++-- .../test/scala/unit/kafka/utils/TestUtils.scala | 40 +- 53 files changed, 2171 insertions(+), 724 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 051c8d1..e94698c 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -69,6 +69,13 @@ <allow pkg="org.apache.kafka.common.annotation" /> <allow pkg="org.apache.kafka.common.network" /> <allow pkg="org.apache.kafka.common.config" /> + <subpackage name="authenticator"> + <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.protocol.types" /> + <allow pkg="org.apache.kafka.common.requests" /> + <allow pkg="org.apache.kafka.common.errors" /> + <allow pkg="org.apache.kafka.clients" /> + </subpackage> </subpackage> <subpackage name="protocol"> http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 0201257..ad9c5d0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,8 @@ public class ClientUtils { SecurityProtocol securityProtocol = SecurityProtocol.forName((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); if (!SecurityProtocol.nonTestingValues().contains(securityProtocol)) throw new ConfigException("Invalid SecurityProtocol " + securityProtocol); - return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs); + String clientSaslMechanism = (String) configs.get(SaslConfigs.SASL_MECHANISM); + return ChannelBuilders.create(securityProtocol, Mode.CLIENT, LoginType.CLIENT, configs, clientSaslMechanism, true); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index d2eaace..cc5dc6f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -373,6 +374,16 @@ public class NetworkClient implements KafkaClient { return found; } + public static Struct parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { + ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer); + // Always expect the response version id to be the same as the request version id + short apiKey = requestHeader.apiKey(); + short apiVer = requestHeader.apiVersion(); + Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer); + correlate(requestHeader, responseHeader); + return responseBody; + } + /** * Post process disconnection of a node * @@ -437,12 +448,7 @@ public class NetworkClient implements KafkaClient { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); ClientRequest req = inFlightRequests.completeNext(source); - ResponseHeader header = ResponseHeader.parse(receive.payload()); - // Always expect the response version id to be the same as the request version id - short apiKey = req.request().header().apiKey(); - short apiVer = req.request().header().apiVersion(); - Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload()); - correlate(req.request().header(), header); + Struct body = parseResponse(receive.payload(), req.request().header()); if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) responses.add(new ClientResponse(req, now, false, body)); } @@ -477,7 +483,7 @@ public class NetworkClient implements KafkaClient { /** * Validate that the response corresponds to the request we expect or else explode */ - private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { + private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { if (requestHeader.correlationId() != responseHeader.correlationId()) throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + ") does not match request (" + requestHeader.correlationId() + ")"); http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java index d61838f..d3aa0d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java @@ -20,6 +20,17 @@ public class SaslConfigs { /* * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. */ + /** SASL mechanism configuration - standard mechanism names are listed <a href="http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml">here</a>. */ + public static final String SASL_MECHANISM = "sasl.mechanism"; + public static final String SASL_MECHANISM_DOC = "SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism."; + public static final String GSSAPI_MECHANISM = "GSSAPI"; + public static final String DEFAULT_SASL_MECHANISM = GSSAPI_MECHANISM; + + public static final String SASL_ENABLED_MECHANISMS = "sasl.enabled.mechanisms"; + public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL mechanisms enabled in the Kafka server. " + + "The list may contain any mechanism for which a security provider is available. " + + "Only GSSAPI is enabled by default."; + public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = Collections.singletonList(GSSAPI_MECHANISM); public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; public static final String SASL_KERBEROS_SERVICE_NAME_DOC = "The Kerberos principal name that Kafka runs as. " @@ -54,7 +65,7 @@ public class SaslConfigs { .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC) .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC) - .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC); + .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC) + .define(SaslConfigs.SASL_MECHANISM, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_SASL_MECHANISM, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_MECHANISM_DOC); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java new file mode 100644 index 0000000..7b60e11 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/AuthenticationException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class AuthenticationException extends ApiException { + + private static final long serialVersionUID = 1L; + + public AuthenticationException(String message) { + super(message); + } + + public AuthenticationException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java new file mode 100644 index 0000000..7fd008c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/IllegalSaslStateException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class IllegalSaslStateException extends AuthenticationException { + + private static final long serialVersionUID = 1L; + + public IllegalSaslStateException(String message) { + super(message); + } + + public IllegalSaslStateException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java new file mode 100644 index 0000000..289a09f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedSaslMechanismException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class UnsupportedSaslMechanismException extends AuthenticationException { + + private static final long serialVersionUID = 1L; + + public UnsupportedSaslMechanismException(String message) { + super(message); + } + + public UnsupportedSaslMechanismException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 669f269..2d6ba8a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -31,10 +31,18 @@ public class ChannelBuilders { * it is ignored otherwise * @param loginType the loginType, it must be non-null if `securityProtocol` is SASL_*; it is ignored otherwise * @param configs client/server configs + * @param clientSaslMechanism SASL mechanism if mode is CLIENT, ignored otherwise + * @param saslHandshakeRequestEnable flag to enable Sasl handshake requests; disabled only for SASL + * inter-broker connections with inter-broker protocol version < 0.10 * @return the configured `ChannelBuilder` * @throws IllegalArgumentException if `mode` invariants described above is not maintained */ - public static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode, LoginType loginType, Map<String, ?> configs) { + public static ChannelBuilder create(SecurityProtocol securityProtocol, + Mode mode, + LoginType loginType, + Map<String, ?> configs, + String clientSaslMechanism, + boolean saslHandshakeRequestEnable) { ChannelBuilder channelBuilder; switch (securityProtocol) { @@ -47,7 +55,9 @@ public class ChannelBuilders { requireNonNullMode(mode, securityProtocol); if (loginType == null) throw new IllegalArgumentException("`loginType` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); - channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol); + if (mode == Mode.CLIENT && clientSaslMechanism == null) + throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`"); + channelBuilder = new SaslChannelBuilder(mode, loginType, securityProtocol, clientSaslMechanism, saslHandshakeRequestEnable); break; case PLAINTEXT: case TRACE: http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 0cd5bfe..a0464bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -21,13 +21,12 @@ import java.util.Map; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; -import org.apache.kafka.common.security.kerberos.LoginManager; +import org.apache.kafka.common.security.authenticator.LoginManager; import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.KafkaException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,24 +34,34 @@ public class SaslChannelBuilder implements ChannelBuilder { private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class); private final SecurityProtocol securityProtocol; + private final String clientSaslMechanism; private final Mode mode; private final LoginType loginType; + private final boolean handshakeRequestEnable; private LoginManager loginManager; private SslFactory sslFactory; private Map<String, ?> configs; private KerberosShortNamer kerberosShortNamer; - public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol) { + public SaslChannelBuilder(Mode mode, LoginType loginType, SecurityProtocol securityProtocol, String clientSaslMechanism, boolean handshakeRequestEnable) { this.mode = mode; this.loginType = loginType; this.securityProtocol = securityProtocol; + this.handshakeRequestEnable = handshakeRequestEnable; + this.clientSaslMechanism = clientSaslMechanism; } public void configure(Map<String, ?> configs) throws KafkaException { try { this.configs = configs; - this.loginManager = LoginManager.acquireLoginManager(loginType, configs); + boolean hasKerberos; + if (mode == Mode.SERVER) { + List<String> enabledMechanisms = (List<String>) this.configs.get(SaslConfigs.SASL_ENABLED_MECHANISMS); + hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM); + } else { + hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM); + } String defaultRealm; try { @@ -61,10 +70,13 @@ public class SaslChannelBuilder implements ChannelBuilder { defaultRealm = ""; } - @SuppressWarnings("unchecked") - List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES); - if (principalToLocalRules != null) - kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules); + if (hasKerberos) { + @SuppressWarnings("unchecked") + List<String> principalToLocalRules = (List<String>) configs.get(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES); + if (principalToLocalRules != null) + kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules); + } + this.loginManager = LoginManager.acquireLoginManager(loginType, hasKerberos, configs); if (this.securityProtocol == SecurityProtocol.SASL_SSL) { // Disable SSL client authentication as we are using SASL authentication @@ -82,10 +94,11 @@ public class SaslChannelBuilder implements ChannelBuilder { TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel); Authenticator authenticator; if (mode == Mode.SERVER) - authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer, maxReceiveSize); + authenticator = new SaslServerAuthenticator(id, loginManager.subject(), kerberosShortNamer, + socketChannel.socket().getLocalAddress().getHostName(), maxReceiveSize); else authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(), - socketChannel.socket().getInetAddress().getHostName()); + socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable); // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes. authenticator.configure(transportLayer, null, this.configs); return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize); @@ -96,7 +109,8 @@ public class SaslChannelBuilder implements ChannelBuilder { } public void close() { - this.loginManager.release(); + if (this.loginManager != null) + this.loginManager.release(); } protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException { http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index e8fd3d3..512a121 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -36,7 +36,8 @@ public enum ApiKeys { LEAVE_GROUP(13, "LeaveGroup"), SYNC_GROUP(14, "SyncGroup"), DESCRIBE_GROUPS(15, "DescribeGroups"), - LIST_GROUPS(16, "ListGroups"); + LIST_GROUPS(16, "ListGroups"), + SASL_HANDSHAKE(17, "SaslHandshake"); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 0f33516..9013399 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException; import org.apache.kafka.common.errors.GroupLoadInProgressException; import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.InconsistentGroupProtocolException; import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException; import org.apache.kafka.common.errors.InvalidFetchSizeException; @@ -49,6 +50,7 @@ import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -130,7 +132,11 @@ public enum Errors { CLUSTER_AUTHORIZATION_FAILED(31, new ClusterAuthorizationException("Cluster authorization failed.")), INVALID_TIMESTAMP(32, - new InvalidTimestampException("The timestamp of the message is out of acceptable range.")); + new InvalidTimestampException("The timestamp of the message is out of acceptable range.")), + UNSUPPORTED_SASL_MECHANISM(33, + new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")), + ILLEGAL_SASL_STATE(34, + new IllegalSaslStateException("Request is not valid given the current SASL state.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 248b7ec..bf76557 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -718,6 +718,17 @@ public class Protocol { public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2}; public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2}; + /* SASL handshake api */ + public static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema( + new Field("mechanism", STRING, "SASL Mechanism chosen by the client.")); + + public static final Schema SASL_HANDSHAKE_RESPONSE_V0 = new Schema( + new Field("error_code", INT16), + new Field("enabled_mechanisms", new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server.")); + + public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0}; + public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0}; + /* an array of all requests and responses with all schema versions; a null value in the inner array means that the * particular version is not supported */ public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; @@ -744,6 +755,7 @@ public class Protocol { REQUESTS[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_REQUEST; REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST; REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST; + REQUESTS[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; @@ -762,6 +774,7 @@ public class Protocol { RESPONSES[ApiKeys.SYNC_GROUP.id] = SYNC_GROUP_RESPONSE; RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE; RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE; + RESPONSES[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_RESPONSE; /* set the maximum version of each api */ for (ApiKeys api : ApiKeys.values()) http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 5a40b7f..89c2ce1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -72,6 +72,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return DescribeGroupsRequest.parse(buffer, versionId); case LIST_GROUPS: return ListGroupsRequest.parse(buffer, versionId); + case SASL_HANDSHAKE: + return SaslHandshakeRequest.parse(buffer, versionId); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " + "code should be updated to do so.", apiKey)); http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java index 12b06d1..9494de7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java @@ -31,7 +31,7 @@ public class ResponseSend extends NetworkSend { this(destination, header, response.toStruct()); } - private static ByteBuffer serialize(ResponseHeader header, Struct body) { + public static ByteBuffer serialize(ResponseHeader header, Struct body) { ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf()); header.writeTo(buffer); body.writeTo(buffer); http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java new file mode 100644 index 0000000..bddc9f0 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + + +/** + * Request from SASL client containing client SASL mechanism. + * <p/> + * For interoperability with Kafka 0.9.0.x, the mechanism flow may be omitted when using GSSAPI. Hence + * this request should not conflict with the first GSSAPI client packet. For GSSAPI, the first context + * establishment packet starts with byte 0x60 (APPLICATION-0 tag) followed by a variable-length encoded size. + * This handshake request starts with a request header two-byte API key set to 17, followed by a mechanism name, + * making it easy to distinguish from a GSSAPI packet. + */ +public class SaslHandshakeRequest extends AbstractRequest { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.SASL_HANDSHAKE.id); + public static final String MECHANISM_KEY_NAME = "mechanism"; + + private final String mechanism; + + public SaslHandshakeRequest(String mechanism) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(MECHANISM_KEY_NAME, mechanism); + this.mechanism = mechanism; + } + + public SaslHandshakeRequest(Struct struct) { + super(struct); + mechanism = struct.getString(MECHANISM_KEY_NAME); + } + + public String mechanism() { + return mechanism; + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + List<String> enabledMechanisms = Collections.emptyList(); + return new SaslHandshakeResponse(Errors.forException(e).code(), enabledMechanisms); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id))); + } + } + + public static SaslHandshakeRequest parse(ByteBuffer buffer, int versionId) { + return new SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, versionId, buffer)); + } + + public static SaslHandshakeRequest parse(ByteBuffer buffer) { + return new SaslHandshakeRequest(CURRENT_SCHEMA.read(buffer)); + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java new file mode 100644 index 0000000..c0fc495 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + + +/** + * Response from SASL server which indicates if the client-chosen mechanism is enabled in the server. + * For error responses, the list of enabled mechanisms is included in the response. + */ +public class SaslHandshakeResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.SASL_HANDSHAKE.id); + + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String ENABLED_MECHANISMS_KEY_NAME = "enabled_mechanisms"; + + /** + * Possible error codes: + * UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server + * ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake + */ + private final short errorCode; + private final List<String> enabledMechanisms; + + public SaslHandshakeResponse(short errorCode, Collection<String> enabledMechanisms) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray()); + this.errorCode = errorCode; + this.enabledMechanisms = new ArrayList<>(enabledMechanisms); + } + + public SaslHandshakeResponse(Struct struct) { + super(struct); + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME); + ArrayList<String> enabledMechanisms = new ArrayList<>(); + for (Object mechanism : mechanisms) + enabledMechanisms.add((String) mechanism); + this.enabledMechanisms = enabledMechanisms; + } + + public short errorCode() { + return errorCode; + } + + public List<String> enabledMechanisms() { + return enabledMechanisms; + } + + public static SaslHandshakeResponse parse(ByteBuffer buffer) { + return new SaslHandshakeResponse(CURRENT_SCHEMA.read(buffer)); + } + + public static SaslHandshakeResponse parse(ByteBuffer buffer, int version) { + return new SaslHandshakeResponse(ProtoUtils.parseResponse(ApiKeys.SASL_HANDSHAKE.id, version, buffer)); + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java new file mode 100644 index 0000000..ed2c087 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.auth; + +import java.util.Map; + +import org.apache.kafka.common.network.Mode; + +import javax.security.auth.Subject; +import javax.security.auth.callback.CallbackHandler; + +/* + * Callback handler for SASL-based authentication + */ +public interface AuthCallbackHandler extends CallbackHandler { + + /** + * Configures this callback handler. + * + * @param configs Configuration + * @param mode The mode that indicates if this is a client or server connection + * @param subject Subject from login context + * @param saslMechanism Negotiated SASL mechanism + */ + void configure(Map<String, ?> configs, Mode mode, Subject subject, String saslMechanism); + + /** + * Closes this instance. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java new file mode 100644 index 0000000..1ac779d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.auth; + +import java.util.Map; + +import javax.security.auth.Subject; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; + +/** + * Login interface for authentication. + */ +public interface Login { + + /** + * Configures this login instance. + */ + void configure(Map<String, ?> configs, String loginContextName); + + /** + * Performs login for each login module specified for the login context of this instance. + */ + LoginContext login() throws LoginException; + + /** + * Returns the authenticated subject of this login context. + */ + Subject subject(); + + /** + * Returns the service name to be used for SASL. + */ + String serviceName(); + + /** + * Closes this instance. + */ + void close(); +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java new file mode 100644 index 0000000..2fe43ab --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.authenticator; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import javax.security.sasl.RealmCallback; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.Subject; + +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.security.auth.Login; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Base login class that implements methods common to typical SASL mechanisms. + */ +public abstract class AbstractLogin implements Login { + private static final Logger log = LoggerFactory.getLogger(AbstractLogin.class); + + private String loginContextName; + private LoginContext loginContext; + + + @Override + public void configure(Map<String, ?> configs, String loginContextName) { + this.loginContextName = loginContextName; + } + + @Override + public LoginContext login() throws LoginException { + String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM); + if (jaasConfigFile == null) { + log.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is not set, using default JAAS configuration."); + } + AppConfigurationEntry[] configEntries = Configuration.getConfiguration().getAppConfigurationEntry(loginContextName); + if (configEntries == null) { + String errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" + + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile); + throw new IllegalArgumentException(errorMessage); + } + + loginContext = new LoginContext(loginContextName, new LoginCallbackHandler()); + loginContext.login(); + log.info("Successfully logged in."); + return loginContext; + } + + @Override + public Subject subject() { + return loginContext.getSubject(); + } + + /** + * Callback handler for creating login context. Login callback handlers + * should support the callbacks required for the login modules used by + * the KafkaServer and KafkaClient contexts. Kafka does not support + * callback handlers which require additional user input. + * + */ + public static class LoginCallbackHandler implements CallbackHandler { + + @Override + public void handle(Callback[] callbacks) throws UnsupportedCallbackException { + for (Callback callback : callbacks) { + if (callback instanceof NameCallback) { + NameCallback nc = (NameCallback) callback; + nc.setName(nc.getDefaultName()); + } else if (callback instanceof PasswordCallback) { + String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" + + " client code does not currently support obtaining a password from the user."; + throw new UnsupportedCallbackException(callback, errorMessage); + } else if (callback instanceof RealmCallback) { + RealmCallback rc = (RealmCallback) callback; + rc.setText(rc.getDefaultText()); + } else { + throw new UnsupportedCallbackException(callback, "Unrecognized SASL Login callback"); + } + } + } + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java new file mode 100644 index 0000000..0a405bc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultLogin.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.authenticator; + +public class DefaultLogin extends AbstractLogin { + + @Override + public String serviceName() { + return "kafka"; + } + + @Override + public void close() { + } +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java new file mode 100644 index 0000000..9aec9a7 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.security.authenticator; + +import javax.security.auth.Subject; +import javax.security.auth.login.LoginException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.Map; + +import org.apache.kafka.common.network.LoginType; +import org.apache.kafka.common.security.auth.Login; +import org.apache.kafka.common.security.kerberos.KerberosLogin; + +public class LoginManager { + + private static final EnumMap<LoginType, LoginManager> CACHED_INSTANCES = new EnumMap<>(LoginType.class); + + private final Login login; + private final LoginType loginType; + private int refCount; + + private LoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException { + this.loginType = loginType; + String loginContext = loginType.contextName(); + login = hasKerberos ? new KerberosLogin() : new DefaultLogin(); + login.configure(configs, loginContext); + login.login(); + } + + /** + * Returns an instance of `LoginManager` and increases its reference count. + * + * `release()` should be invoked when the `LoginManager` is no longer needed. This method will try to reuse an + * existing `LoginManager` for the provided `mode` if available. However, it expects `configs` to be the same for + * every invocation and it will ignore them in the case where it's returning a cached instance of `LoginManager`. + * + * This is a bit ugly and it would be nicer if we could pass the `LoginManager` to `ChannelBuilders.create` and + * shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more + * complicated to do the latter without making the consumer API more complex. + * + * @param loginType the type of the login context, it should be SERVER for the broker and CLIENT for the clients + * (i.e. consumer and producer) + * @param configs configuration as key/value pairs + */ + public static final LoginManager acquireLoginManager(LoginType loginType, boolean hasKerberos, Map<String, ?> configs) throws IOException, LoginException { + synchronized (LoginManager.class) { + LoginManager loginManager = CACHED_INSTANCES.get(loginType); + if (loginManager == null) { + loginManager = new LoginManager(loginType, hasKerberos, configs); + CACHED_INSTANCES.put(loginType, loginManager); + } + return loginManager.acquire(); + } + } + + public Subject subject() { + return login.subject(); + } + + public String serviceName() { + return login.serviceName(); + } + + private LoginManager acquire() { + ++refCount; + return this; + } + + /** + * Decrease the reference count for this instance and release resources if it reaches 0. + */ + public void release() { + synchronized (LoginManager.class) { + if (refCount == 0) + throw new IllegalStateException("release called on LoginManager with refCount == 0"); + else if (refCount == 1) { + CACHED_INSTANCES.remove(loginType); + login.close(); + } + --refCount; + } + } + + /* Should only be used in tests. */ + public static void closeAll() { + synchronized (LoginManager.class) { + for (LoginType loginType : new ArrayList<>(CACHED_INSTANCES.keySet())) { + LoginManager loginManager = CACHED_INSTANCES.remove(loginType); + loginManager.login.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 370e729..ba201dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -23,38 +23,44 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.Arrays; import java.util.Map; - import java.security.Principal; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import javax.security.auth.Subject; -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.AuthorizeCallback; -import javax.security.sasl.RealmCallback; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.IllegalSaslStateException; +import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.TransportLayer; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.SaslHandshakeRequest; +import org.apache.kafka.common.requests.SaslHandshakeResponse; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.KafkaException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SaslClientAuthenticator implements Authenticator { public enum SaslState { - INITIAL, INTERMEDIATE, COMPLETE, FAILED + SEND_HANDSHAKE_REQUEST, RECEIVE_HANDSHAKE_RESPONSE, INITIAL, INTERMEDIATE, COMPLETE, FAILED } private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class); @@ -63,33 +69,57 @@ public class SaslClientAuthenticator implements Authenticator { private final String servicePrincipal; private final String host; private final String node; + private final String mechanism; + private final boolean handshakeRequestEnable; // assigned in `configure` private SaslClient saslClient; + private Map<String, ?> configs; private String clientPrincipalName; + private AuthCallbackHandler callbackHandler; private TransportLayer transportLayer; // buffers used in `authenticate` private NetworkReceive netInBuffer; private NetworkSend netOutBuffer; - private SaslState saslState = SaslState.INITIAL; + // Current SASL state + private SaslState saslState; + // Next SASL state to be set when outgoing writes associated with the current SASL state complete + private SaslState pendingSaslState; + // Correlation ID for the next request + private int correlationId; + // Request header for which response from the server is pending + private RequestHeader currentRequestHeader; - public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host) throws IOException { + public SaslClientAuthenticator(String node, Subject subject, String servicePrincipal, String host, String mechanism, boolean handshakeRequestEnable) throws IOException { this.node = node; this.subject = subject; this.host = host; this.servicePrincipal = servicePrincipal; + this.mechanism = mechanism; + this.handshakeRequestEnable = handshakeRequestEnable; + this.correlationId = -1; } public void configure(TransportLayer transportLayer, PrincipalBuilder principalBuilder, Map<String, ?> configs) throws KafkaException { try { this.transportLayer = transportLayer; + this.configs = configs; + + setSaslState(handshakeRequestEnable ? SaslState.SEND_HANDSHAKE_REQUEST : SaslState.INITIAL); // determine client principal from subject. - Principal clientPrincipal = subject.getPrincipals().iterator().next(); - this.clientPrincipalName = clientPrincipal.getName(); - this.saslClient = createSaslClient(); + if (!subject.getPrincipals().isEmpty()) { + Principal clientPrincipal = subject.getPrincipals().iterator().next(); + this.clientPrincipalName = clientPrincipal.getName(); + } else { + clientPrincipalName = null; + } + callbackHandler = new SaslClientCallbackHandler(); + callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism); + + saslClient = createSaslClient(); } catch (Exception e) { throw new KafkaException("Failed to configure SaslClientAuthenticator", e); } @@ -99,15 +129,14 @@ public class SaslClientAuthenticator implements Authenticator { try { return Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() { public SaslClient run() throws SaslException { - String[] mechs = {"GSSAPI"}; + String[] mechs = {mechanism}; LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}", clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs)); - return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, null, - new ClientCallbackHandler()); + return Sasl.createSaslClient(mechs, clientPrincipalName, servicePrincipal, host, configs, callbackHandler); } }); } catch (PrivilegedActionException e) { - throw new KafkaException("Failed to create SaslClient", e.getCause()); + throw new KafkaException("Failed to create SaslClient with mechanism " + mechanism, e.getCause()); } } @@ -123,22 +152,39 @@ public class SaslClientAuthenticator implements Authenticator { return; switch (saslState) { + case SEND_HANDSHAKE_REQUEST: + String clientId = (String) configs.get(CommonClientConfigs.CLIENT_ID_CONFIG); + currentRequestHeader = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, clientId, correlationId++); + SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest(mechanism); + send(RequestSend.serialize(currentRequestHeader, handshakeRequest.toStruct())); + setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE); + break; + case RECEIVE_HANDSHAKE_RESPONSE: + byte[] responseBytes = receiveResponseOrToken(); + if (responseBytes == null) + break; + else { + try { + handleKafkaResponse(currentRequestHeader, responseBytes); + currentRequestHeader = null; + } catch (Exception e) { + setSaslState(SaslState.FAILED); + throw e; + } + setSaslState(SaslState.INITIAL); + // Fall through and start SASL authentication using the configured client mechanism + } case INITIAL: - sendSaslToken(new byte[0]); - saslState = SaslState.INTERMEDIATE; + sendSaslToken(new byte[0], true); + setSaslState(SaslState.INTERMEDIATE); break; case INTERMEDIATE: - if (netInBuffer == null) netInBuffer = new NetworkReceive(node); - netInBuffer.readFrom(transportLayer); - if (netInBuffer.complete()) { - netInBuffer.payload().rewind(); - byte[] serverToken = new byte[netInBuffer.payload().remaining()]; - netInBuffer.payload().get(serverToken, 0, serverToken.length); - netInBuffer = null; // reset the networkReceive as we read all the data. - sendSaslToken(serverToken); + byte[] serverToken = receiveResponseOrToken(); + if (serverToken != null) { + sendSaslToken(serverToken, false); } if (saslClient.isComplete()) { - saslState = SaslState.COMPLETE; + setSaslState(SaslState.COMPLETE); transportLayer.removeInterestOps(SelectionKey.OP_WRITE); } break; @@ -149,30 +195,58 @@ public class SaslClientAuthenticator implements Authenticator { } } - private void sendSaslToken(byte[] serverToken) throws IOException { + private void setSaslState(SaslState saslState) { + if (netOutBuffer != null && !netOutBuffer.completed()) + pendingSaslState = saslState; + else { + this.pendingSaslState = null; + this.saslState = saslState; + LOG.debug("Set SASL client state to {}", saslState); + } + } + + private void sendSaslToken(byte[] serverToken, boolean isInitial) throws IOException { if (!saslClient.isComplete()) { - try { - byte[] saslToken = createSaslToken(serverToken); - if (saslToken != null) { - netOutBuffer = new NetworkSend(node, ByteBuffer.wrap(saslToken)); - flushNetOutBufferAndUpdateInterestOps(); - } - } catch (IOException e) { - saslState = SaslState.FAILED; - throw e; - } + byte[] saslToken = createSaslToken(serverToken, isInitial); + if (saslToken != null) + send(ByteBuffer.wrap(saslToken)); + } + } + + private void send(ByteBuffer buffer) throws IOException { + try { + netOutBuffer = new NetworkSend(node, buffer); + flushNetOutBufferAndUpdateInterestOps(); + } catch (IOException e) { + setSaslState(SaslState.FAILED); + throw e; } } private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException { boolean flushedCompletely = flushNetOutBuffer(); - if (flushedCompletely) + if (flushedCompletely) { transportLayer.removeInterestOps(SelectionKey.OP_WRITE); - else + if (pendingSaslState != null) + setSaslState(pendingSaslState); + } else transportLayer.addInterestOps(SelectionKey.OP_WRITE); return flushedCompletely; } + private byte[] receiveResponseOrToken() throws IOException { + if (netInBuffer == null) netInBuffer = new NetworkReceive(node); + netInBuffer.readFrom(transportLayer); + byte[] serverPacket = null; + if (netInBuffer.complete()) { + netInBuffer.payload().rewind(); + serverPacket = new byte[netInBuffer.payload().remaining()]; + netInBuffer.payload().get(serverPacket, 0, serverPacket.length); + netInBuffer = null; // reset the networkReceive as we read all the data. + } + return serverPacket; + } + public Principal principal() { return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, clientPrincipalName); } @@ -182,19 +256,25 @@ public class SaslClientAuthenticator implements Authenticator { } public void close() throws IOException { - saslClient.dispose(); + if (saslClient != null) + saslClient.dispose(); + if (callbackHandler != null) + callbackHandler.close(); } - private byte[] createSaslToken(final byte[] saslToken) throws SaslException { + private byte[] createSaslToken(final byte[] saslToken, boolean isInitial) throws SaslException { if (saslToken == null) throw new SaslException("Error authenticating with the Kafka Broker: received a `null` saslToken."); try { - return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() { - public byte[] run() throws SaslException { - return saslClient.evaluateChallenge(saslToken); - } - }); + if (isInitial && !saslClient.hasInitialResponse()) + return saslToken; + else + return Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() { + public byte[] run() throws SaslException { + return saslClient.evaluateChallenge(saslToken); + } + }); } catch (PrivilegedActionException e) { String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker."; // Try to provide hints to use about what went wrong so they can fix their configuration. @@ -221,35 +301,39 @@ public class SaslClientAuthenticator implements Authenticator { return netOutBuffer.completed(); } - public static class ClientCallbackHandler implements CallbackHandler { - - public void handle(Callback[] callbacks) throws UnsupportedCallbackException { - for (Callback callback : callbacks) { - if (callback instanceof NameCallback) { - NameCallback nc = (NameCallback) callback; - nc.setName(nc.getDefaultName()); - } else if (callback instanceof PasswordCallback) { - // Call `setPassword` once we support obtaining a password from the user and update message below - throw new UnsupportedCallbackException(callback, "Could not login: the client is being asked for a password, but the Kafka" + - " client code does not currently support obtaining a password from the user." + - " Make sure -Djava.security.auth.login.config property passed to JVM and" + - " the client is configured to use a ticket cache (using" + - " the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using" + - " FQDN of the Kafka broker you are trying to connect to."); - } else if (callback instanceof RealmCallback) { - RealmCallback rc = (RealmCallback) callback; - rc.setText(rc.getDefaultText()); - } else if (callback instanceof AuthorizeCallback) { - AuthorizeCallback ac = (AuthorizeCallback) callback; - String authId = ac.getAuthenticationID(); - String authzId = ac.getAuthorizationID(); - ac.setAuthorized(authId.equals(authzId)); - if (ac.isAuthorized()) - ac.setAuthorizedID(authzId); - } else { - throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback"); - } - } + private void handleKafkaResponse(RequestHeader requestHeader, byte[] responseBytes) { + Struct struct; + ApiKeys apiKey; + try { + struct = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader); + apiKey = ApiKeys.forId(requestHeader.apiKey()); + } catch (SchemaException | IllegalArgumentException e) { + LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens"); + throw new AuthenticationException("Invalid SASL mechanism response", e); + } + switch (apiKey) { + case SASL_HANDSHAKE: + handleSaslHandshakeResponse(new SaslHandshakeResponse(struct)); + break; + default: + throw new IllegalStateException("Unexpected API key during handshake: " + apiKey); + } + } + + private void handleSaslHandshakeResponse(SaslHandshakeResponse response) { + Errors error = Errors.forCode(response.errorCode()); + switch (error) { + case NONE: + break; + case UNSUPPORTED_SASL_MECHANISM: + throw new UnsupportedSaslMechanismException(String.format("Client SASL mechanism '%s' not enabled in the server, enabled mechanisms are %s", + mechanism, response.enabledMechanisms())); + case ILLEGAL_SASL_STATE: + throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s", + mechanism, response.enabledMechanisms())); + default: + throw new AuthenticationException(String.format("Unknown error code %d, client mechanism is %s, enabled mechanisms are %s", + response.errorCode(), mechanism, response.enabledMechanisms())); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5b375d7b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java new file mode 100644 index 0000000..8e0b8db --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.authenticator; + +import java.util.Map; + +import javax.security.auth.Subject; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.RealmCallback; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; + +/** + * Callback handler for Sasl clients. The callbacks required for the SASL mechanism + * configured for the client should be supported by this callback handler. See + * <a href="https://docs.oracle.com/javase/8/docs/technotes/guides/security/sasl/sasl-refguide.html">Java SASL API</a> + * for the list of SASL callback handlers required for each SASL mechanism. + */ +public class SaslClientCallbackHandler implements AuthCallbackHandler { + + private boolean isKerberos; + private Subject subject; + + @Override + public void configure(Map<String, ?> configs, Mode mode, Subject subject, String mechanism) { + this.isKerberos = mechanism.equals(SaslConfigs.GSSAPI_MECHANISM); + this.subject = subject; + } + + @Override + public void handle(Callback[] callbacks) throws UnsupportedCallbackException { + for (Callback callback : callbacks) { + if (callback instanceof NameCallback) { + NameCallback nc = (NameCallback) callback; + if (!isKerberos && subject != null && !subject.getPublicCredentials(String.class).isEmpty()) { + nc.setName(subject.getPublicCredentials(String.class).iterator().next()); + } else + nc.setName(nc.getDefaultName()); + } else if (callback instanceof PasswordCallback) { + if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) { + char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray(); + ((PasswordCallback) callback).setPassword(password); + } else { + String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" + + " client code does not currently support obtaining a password from the user."; + if (isKerberos) { + errorMessage += " Make sure -Djava.security.auth.login.config property passed to JVM and" + + " the client is configured to use a ticket cache (using" + + " the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using" + + " FQDN of the Kafka broker you are trying to connect to."; + } + throw new UnsupportedCallbackException(callback, errorMessage); + } + } else if (callback instanceof RealmCallback) { + RealmCallback rc = (RealmCallback) callback; + rc.setText(rc.getDefaultText()); + } else if (callback instanceof AuthorizeCallback) { + AuthorizeCallback ac = (AuthorizeCallback) callback; + String authId = ac.getAuthenticationID(); + String authzId = ac.getAuthorizationID(); + ac.setAuthorized(authId.equals(authzId)); + if (ac.isAuthorized()) + ac.setAuthorizedID(authzId); + } else { + throw new UnsupportedCallbackException(callback, "Unrecognized SASL ClientCallback"); + } + } + } + + @Override + public void close() { + } +} \ No newline at end of file
