This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4824dc9 KAFKA-7972: Use automatic RPC generation in SaslHandshake 4824dc9 is described below commit 4824dc994d7fc56b7540b643a78aadb4bdd0f14d Author: Mickael Maison <mickael.mai...@gmail.com> AuthorDate: Mon Feb 25 11:20:07 2019 +0530 KAFKA-7972: Use automatic RPC generation in SaslHandshake Author: Mickael Maison <mickael.mai...@gmail.com> Reviewers: Manikumar Reddy <manikumar.re...@gmail.com> Closes #6301 from mimaison/sasl-handshake --- checkstyle/import-control.xml | 1 + .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractResponse.java | 2 +- .../common/requests/SaslHandshakeRequest.java | 73 +++++++--------------- .../common/requests/SaslHandshakeResponse.java | 62 +++++------------- .../authenticator/SaslClientAuthenticator.java | 4 +- .../authenticator/SaslServerAuthenticator.java | 19 +++--- .../kafka/common/requests/RequestResponseTest.java | 9 ++- .../authenticator/SaslAuthenticatorTest.java | 16 +++-- core/src/main/scala/kafka/server/KafkaApis.scala | 4 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 3 +- .../kafka/server/SaslApiVersionsRequestTest.scala | 3 +- 12 files changed, 86 insertions(+), 116 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 4702011..ffc9bf9 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -102,6 +102,7 @@ <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.errors" /> <subpackage name="authenticator"> + <allow pkg="org.apache.kafka.common.message" /> <allow pkg="org.apache.kafka.common.protocol.types" /> <allow pkg="org.apache.kafka.common.requests" /> <allow pkg="org.apache.kafka.clients" /> diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 1acd4e1..937b044 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.SaslHandshakeRequestData; +import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; @@ -97,8 +99,6 @@ import org.apache.kafka.common.requests.RenewDelegationTokenRequest; import org.apache.kafka.common.requests.RenewDelegationTokenResponse; import org.apache.kafka.common.requests.SaslAuthenticateRequest; import org.apache.kafka.common.requests.SaslAuthenticateResponse; -import org.apache.kafka.common.requests.SaslHandshakeRequest; -import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.requests.StopReplicaRequest; import org.apache.kafka.common.requests.StopReplicaResponse; import org.apache.kafka.common.requests.SyncGroupRequest; @@ -142,7 +142,7 @@ public enum ApiKeys { DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequest.schemaVersions(), DescribeGroupsResponse.schemaVersions()), LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()), - SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequest.schemaVersions(), SaslHandshakeResponse.schemaVersions()), + SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS), API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions()) { @Override public Struct parseResponse(short version, ByteBuffer buffer) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index d9bb69e..68b505b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -105,7 +105,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case LIST_GROUPS: return new ListGroupsResponse(struct); case SASL_HANDSHAKE: - return new SaslHandshakeResponse(struct); + return new SaslHandshakeResponse(struct, version); case API_VERSIONS: return new ApiVersionsResponse(struct); case CREATE_TOPICS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java index 7225eb7..be7f4f8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java @@ -16,17 +16,13 @@ */ package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.SaslHandshakeRequestData; +import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -import static org.apache.kafka.common.protocol.types.Type.STRING; /** * Request from SASL client containing client SASL mechanism. @@ -38,73 +34,54 @@ import static org.apache.kafka.common.protocol.types.Type.STRING; * making it easy to distinguish from a GSSAPI packet. */ public class SaslHandshakeRequest extends AbstractRequest { - private static final String MECHANISM_KEY_NAME = "mechanism"; - - private static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema( - new Field("mechanism", STRING, "SASL Mechanism chosen by the client.")); - - // SASL_HANDSHAKE_REQUEST_V1 added to support SASL_AUTHENTICATE request to improve diagnostics - private static final Schema SASL_HANDSHAKE_REQUEST_V1 = SASL_HANDSHAKE_REQUEST_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{SASL_HANDSHAKE_REQUEST_V0, SASL_HANDSHAKE_REQUEST_V1}; - } - - private final String mechanism; public static class Builder extends AbstractRequest.Builder<SaslHandshakeRequest> { - private final String mechanism; + private final SaslHandshakeRequestData data; - public Builder(String mechanism) { + public Builder(SaslHandshakeRequestData data) { super(ApiKeys.SASL_HANDSHAKE); - this.mechanism = mechanism; + this.data = data; } @Override public SaslHandshakeRequest build(short version) { - return new SaslHandshakeRequest(mechanism, version); + return new SaslHandshakeRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=SaslHandshakeRequest"). - append(", mechanism=").append(mechanism). - append(")"); - return bld.toString(); + return data.toString(); } } - public SaslHandshakeRequest(String mechanism) { - this(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion()); + private final SaslHandshakeRequestData data; + private final short version; + + public SaslHandshakeRequest(SaslHandshakeRequestData data) { + this(data, ApiKeys.SASL_HANDSHAKE.latestVersion()); } - public SaslHandshakeRequest(String mechanism, short version) { + public SaslHandshakeRequest(SaslHandshakeRequestData data, short version) { super(ApiKeys.SASL_HANDSHAKE, version); - this.mechanism = mechanism; + this.data = data; + this.version = version; } public SaslHandshakeRequest(Struct struct, short version) { super(ApiKeys.SASL_HANDSHAKE, version); - mechanism = struct.getString(MECHANISM_KEY_NAME); + this.data = new SaslHandshakeRequestData(struct, version); + this.version = version; } - public String mechanism() { - return mechanism; + public SaslHandshakeRequestData data() { + return data; } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - short versionId = version(); - switch (versionId) { - case 0: - case 1: - List<String> enabledMechanisms = Collections.emptyList(); - return new SaslHandshakeResponse(Errors.forException(e), enabledMechanisms); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ApiKeys.SASL_HANDSHAKE.latestVersion())); - } + SaslHandshakeResponseData response = new SaslHandshakeResponseData(); + response.setErrorCode(ApiError.fromThrowable(e).error().code()); + return new SaslHandshakeResponse(response); } public static SaslHandshakeRequest parse(ByteBuffer buffer, short version) { @@ -113,9 +90,7 @@ public class SaslHandshakeRequest extends AbstractRequest { @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.requestSchema(version())); - struct.set(MECHANISM_KEY_NAME, mechanism); - return struct; + return data.toStruct(version); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java index 9faa36c..939c5e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java @@ -16,84 +16,56 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.protocol.types.Type; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; - - /** * Response from SASL server which indicates if the client-chosen mechanism is enabled in the server. * For error responses, the list of enabled mechanisms is included in the response. */ public class SaslHandshakeResponse extends AbstractResponse { - private static final String ENABLED_MECHANISMS_KEY_NAME = "enabled_mechanisms"; - - private static final Schema SASL_HANDSHAKE_RESPONSE_V0 = new Schema( - ERROR_CODE, - new Field(ENABLED_MECHANISMS_KEY_NAME, new ArrayOf(Type.STRING), "Array of mechanisms enabled in the server.")); - - private static final Schema SASL_HANDSHAKE_RESPONSE_V1 = SASL_HANDSHAKE_RESPONSE_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{SASL_HANDSHAKE_RESPONSE_V0, SASL_HANDSHAKE_RESPONSE_V1}; - } - /** - * Possible error codes: - * UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server - * ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake - */ - private final Errors error; - private final List<String> enabledMechanisms; + private final SaslHandshakeResponseData data; - public SaslHandshakeResponse(Errors error, Collection<String> enabledMechanisms) { - this.error = error; - this.enabledMechanisms = new ArrayList<>(enabledMechanisms); + public SaslHandshakeResponse(SaslHandshakeResponseData data) { + this.data = data; } - public SaslHandshakeResponse(Struct struct) { - error = Errors.forCode(struct.get(ERROR_CODE)); - Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME); - ArrayList<String> enabledMechanisms = new ArrayList<>(); - for (Object mechanism : mechanisms) - enabledMechanisms.add((String) mechanism); - this.enabledMechanisms = enabledMechanisms; + public SaslHandshakeResponse(Struct struct, short version) { + this.data = new SaslHandshakeResponseData(struct, version); } + /* + * Possible error codes: + * UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server + * ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake + */ public Errors error() { - return error; + return Errors.forCode(data.errorCode()); } @Override public Map<Errors, Integer> errorCounts() { - return errorCounts(error); + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); } @Override public Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version)); - struct.set(ERROR_CODE, error.code()); - struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray()); - return struct; + return data.toStruct(version); } public List<String> enabledMechanisms() { - return enabledMechanisms; + return data.mechanisms(); } public static SaslHandshakeResponse parse(ByteBuffer buffer, short version) { - return new SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer)); + return new SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer), version); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 78428ee..cc26336 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; +import org.apache.kafka.common.message.SaslHandshakeRequestData; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.ReauthenticationContext; @@ -328,7 +329,8 @@ public class SaslClientAuthenticator implements Authenticator { // Visible to override for testing protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { - return new SaslHandshakeRequest.Builder(mechanism).build(version); + return new SaslHandshakeRequest.Builder( + new SaslHandshakeRequestData().setMechanism(mechanism)).build(version); } // Visible to override for testing diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 1e62e7f..ccd94fc 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.network.ListenerName; @@ -35,7 +36,6 @@ import org.apache.kafka.common.network.Send; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; @@ -50,6 +50,7 @@ import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.SaslAuthenticationContext; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.kerberos.KerberosError; import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; @@ -77,12 +78,12 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; public class SaslServerAuthenticator implements Authenticator { // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms @@ -118,7 +119,7 @@ public class SaslServerAuthenticator implements Authenticator { private final String connectionId; private final Map<String, Subject> subjects; private final TransportLayer transportLayer; - private final Set<String> enabledMechanisms; + private final List<String> enabledMechanisms; private final Map<String, ?> configs; private final KafkaPrincipalBuilder principalBuilder; private final Map<String, AuthenticateCallbackHandler> callbackHandlers; @@ -168,8 +169,8 @@ public class SaslServerAuthenticator implements Authenticator { List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG); if (enabledMechanisms == null || enabledMechanisms.isEmpty()) throw new IllegalArgumentException("No SASL mechanisms are enabled"); - this.enabledMechanisms = new HashSet<>(enabledMechanisms); - for (String mechanism : enabledMechanisms) { + this.enabledMechanisms = new ArrayList<String>(new HashSet<String>(enabledMechanisms)); + for (String mechanism : this.enabledMechanisms) { if (!callbackHandlers.containsKey(mechanism)) throw new IllegalArgumentException("Callback handler not specified for SASL mechanism " + mechanism); if (!subjects.containsKey(mechanism)) @@ -538,17 +539,19 @@ public class SaslServerAuthenticator implements Authenticator { } private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException { - String clientMechanism = handshakeRequest.mechanism(); + String clientMechanism = handshakeRequest.data().mechanism(); short version = context.header.apiVersion(); if (version >= 1) this.enableKafkaSaslAuthenticateHeaders(true); if (enabledMechanisms.contains(clientMechanism)) { LOG.debug("Using SASL mechanism '{}' provided by client", clientMechanism); - sendKafkaResponse(context, new SaslHandshakeResponse(Errors.NONE, enabledMechanisms)); + sendKafkaResponse(context, new SaslHandshakeResponse( + new SaslHandshakeResponseData().setErrorCode(Errors.NONE.code()).setMechanisms(enabledMechanisms))); return clientMechanism; } else { LOG.debug("SASL mechanism '{}' requested by client is not supported", clientMechanism); - buildResponseOnAuthenticateFailure(context, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms)); + buildResponseOnAuthenticateFailure(context, new SaslHandshakeResponse( + new SaslHandshakeResponseData().setErrorCode(Errors.UNSUPPORTED_SASL_MECHANISM.code()).setMechanisms(enabledMechanisms))); throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism); } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 953b2bd..c105d3b 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -45,6 +45,8 @@ import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.Partiti import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.SaslHandshakeRequestData; +import org.apache.kafka.common.message.SaslHandshakeResponseData; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -1011,11 +1013,14 @@ public class RequestResponseTest { } private SaslHandshakeRequest createSaslHandshakeRequest() { - return new SaslHandshakeRequest("PLAIN"); + return new SaslHandshakeRequest.Builder( + new SaslHandshakeRequestData().setMechanism("PLAIN")).build(); } private SaslHandshakeResponse createSaslHandshakeResponse() { - return new SaslHandshakeResponse(Errors.NONE, singletonList("GSSAPI")); + return new SaslHandshakeResponse( + new SaslHandshakeResponseData() + .setErrorCode(Errors.NONE.code()).setMechanisms(singletonList("GSSAPI"))); } private SaslAuthenticateRequest createSaslAuthenticateRequest() { diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index c139062..97d114f 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -53,6 +53,7 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.errors.SaslAuthenticationException; +import org.apache.kafka.common.message.SaslHandshakeRequestData; import org.apache.kafka.common.network.CertStores; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.ChannelBuilders; @@ -708,8 +709,9 @@ public class SaslAuthenticatorTest { // Send SaslHandshakeRequest and validate that connection is closed by server. String node1 = "invalid1"; createClientConnection(SecurityProtocol.PLAINTEXT, node1); - SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN"); + SaslHandshakeRequest request = buildSaslHandshakeRequest("PLAIN", ApiKeys.SASL_HANDSHAKE.latestVersion()); RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, Short.MAX_VALUE, "someclient", 2); + selector.send(request.toSend(node1, header)); // This test uses a non-SASL PLAINTEXT client in order to do manual handshake. // So the channel is in READY state. @@ -1715,7 +1717,7 @@ public class SaslAuthenticatorTest { servicePrincipal, serverHost, saslMechanism, true, transportLayer, time) { @Override protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { - return new SaslHandshakeRequest.Builder(saslMechanism).build((short) 0); + return buildSaslHandshakeRequest(saslMechanism, (short) 0); } @Override protected void saslAuthenticateVersion(ApiVersionsResponse apiVersionsResponse) { @@ -1927,7 +1929,7 @@ public class SaslAuthenticatorTest { } private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node, short version) throws Exception { - SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest.Builder("PLAIN").build(version); + SaslHandshakeRequest handshakeRequest = buildSaslHandshakeRequest("PLAIN", version); SaslHandshakeResponse response = (SaslHandshakeResponse) sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest); assertEquals(Errors.NONE, response.error()); return response; @@ -1971,6 +1973,11 @@ public class SaslAuthenticatorTest { } } + private SaslHandshakeRequest buildSaslHandshakeRequest(String mechanism, short version) { + return new SaslHandshakeRequest.Builder( + new SaslHandshakeRequestData().setMechanism(mechanism)).build(version); + } + @SuppressWarnings("unchecked") private void updateScramCredentialCache(String username, String password) throws NoSuchAlgorithmException { for (String mechanism : (List<String>) saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) { @@ -2228,7 +2235,8 @@ public class SaslAuthenticatorTest { "PLAIN", true, transportLayer, time) { @Override protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { - return new SaslHandshakeRequest.Builder("PLAIN").build(version); + return new SaslHandshakeRequest.Builder( + new SaslHandshakeRequestData().setMechanism("PLAIN")).build(version); } }; } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c37edf6..bdd794c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -48,6 +48,7 @@ import org.apache.kafka.common.message.CreateTopicsResponseData import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet} import org.apache.kafka.common.message.ElectPreferredLeadersResponseData import org.apache.kafka.common.message.LeaveGroupResponseData +import org.apache.kafka.common.message.SaslHandshakeResponseData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -1359,7 +1360,8 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleSaslHandshakeRequest(request: RequestChannel.Request) { - sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet())) + val responseData = new SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code()) + sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(responseData)) } def handleSaslAuthenticateRequest(request: RequestChannel.Request) { diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index b5531fe..2d82e7f 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -29,6 +29,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceP import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet} +import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys @@ -273,7 +274,7 @@ class RequestQuotaTest extends BaseRequestTest { new ListGroupsRequest.Builder() case ApiKeys.SASL_HANDSHAKE => - new SaslHandshakeRequest.Builder("PLAIN") + new SaslHandshakeRequest.Builder(new SaslHandshakeRequestData().setMechanism("PLAIN")) case ApiKeys.SASL_AUTHENTICATE => new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new Array[Byte](0))) diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 00b9934..185a2f4 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -19,6 +19,7 @@ package kafka.server import java.net.Socket import java.util.Collections +import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} import org.apache.kafka.common.requests.SaslHandshakeRequest @@ -95,7 +96,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslSetup { } private def sendSaslHandshakeRequestValidateResponse(socket: Socket) { - val request = new SaslHandshakeRequest("PLAIN") + val request = new SaslHandshakeRequest(new SaslHandshakeRequestData().setMechanism("PLAIN")) val response = sendAndReceive(request, ApiKeys.SASL_HANDSHAKE, socket) val handshakeResponse = SaslHandshakeResponse.parse(response, request.version) assertEquals(Errors.NONE, handshakeResponse.error)