This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4824dc9  KAFKA-7972: Use automatic RPC generation in SaslHandshake
4824dc9 is described below

commit 4824dc994d7fc56b7540b643a78aadb4bdd0f14d
Author: Mickael Maison <mickael.mai...@gmail.com>
AuthorDate: Mon Feb 25 11:20:07 2019 +0530

    KAFKA-7972: Use automatic RPC generation in SaslHandshake
    
    Author: Mickael Maison <mickael.mai...@gmail.com>
    
    Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>
    
    Closes #6301 from mimaison/sasl-handshake
---
 checkstyle/import-control.xml                      |  1 +
 .../org/apache/kafka/common/protocol/ApiKeys.java  |  6 +-
 .../kafka/common/requests/AbstractResponse.java    |  2 +-
 .../common/requests/SaslHandshakeRequest.java      | 73 +++++++---------------
 .../common/requests/SaslHandshakeResponse.java     | 62 +++++-------------
 .../authenticator/SaslClientAuthenticator.java     |  4 +-
 .../authenticator/SaslServerAuthenticator.java     | 19 +++---
 .../kafka/common/requests/RequestResponseTest.java |  9 ++-
 .../authenticator/SaslAuthenticatorTest.java       | 16 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  4 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  3 +-
 .../kafka/server/SaslApiVersionsRequestTest.scala  |  3 +-
 12 files changed, 86 insertions(+), 116 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4702011..ffc9bf9 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -102,6 +102,7 @@
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.errors" />
       <subpackage name="authenticator">
+        <allow pkg="org.apache.kafka.common.message" />
         <allow pkg="org.apache.kafka.common.protocol.types" />
         <allow pkg="org.apache.kafka.common.requests" />
         <allow pkg="org.apache.kafka.clients" />
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 1acd4e1..937b044 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -22,6 +22,8 @@ import 
org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.SaslHandshakeRequestData;
+import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -97,8 +99,6 @@ import 
org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
 import org.apache.kafka.common.requests.SaslAuthenticateRequest;
 import org.apache.kafka.common.requests.SaslAuthenticateResponse;
-import org.apache.kafka.common.requests.SaslHandshakeRequest;
-import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.requests.StopReplicaRequest;
 import org.apache.kafka.common.requests.StopReplicaResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
@@ -142,7 +142,7 @@ public enum ApiKeys {
     DESCRIBE_GROUPS(15, "DescribeGroups", 
DescribeGroupsRequest.schemaVersions(),
             DescribeGroupsResponse.schemaVersions()),
     LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), 
ListGroupsResponse.schemaVersions()),
-    SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequest.schemaVersions(), 
SaslHandshakeResponse.schemaVersions()),
+    SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, 
SaslHandshakeResponseData.SCHEMAS),
     API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), 
ApiVersionsResponse.schemaVersions()) {
         @Override
         public Struct parseResponse(short version, ByteBuffer buffer) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index d9bb69e..68b505b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -105,7 +105,7 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
             case LIST_GROUPS:
                 return new ListGroupsResponse(struct);
             case SASL_HANDSHAKE:
-                return new SaslHandshakeResponse(struct);
+                return new SaslHandshakeResponse(struct, version);
             case API_VERSIONS:
                 return new ApiVersionsResponse(struct);
             case CREATE_TOPICS:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index 7225eb7..be7f4f8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -16,17 +16,13 @@
  */
 package org.apache.kafka.common.requests;
 
+
+import org.apache.kafka.common.message.SaslHandshakeRequestData;
+import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 /**
  * Request from SASL client containing client SASL mechanism.
@@ -38,73 +34,54 @@ import static 
org.apache.kafka.common.protocol.types.Type.STRING;
  * making it easy to distinguish from a GSSAPI packet.
  */
 public class SaslHandshakeRequest extends AbstractRequest {
-    private static final String MECHANISM_KEY_NAME = "mechanism";
-
-    private static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema(
-            new Field("mechanism", STRING, "SASL Mechanism chosen by the 
client."));
-
-    // SASL_HANDSHAKE_REQUEST_V1 added to support SASL_AUTHENTICATE request to 
improve diagnostics
-    private static final Schema SASL_HANDSHAKE_REQUEST_V1 = 
SASL_HANDSHAKE_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{SASL_HANDSHAKE_REQUEST_V0, 
SASL_HANDSHAKE_REQUEST_V1};
-    }
-
-    private final String mechanism;
 
     public static class Builder extends 
AbstractRequest.Builder<SaslHandshakeRequest> {
-        private final String mechanism;
+        private final SaslHandshakeRequestData data;
 
-        public Builder(String mechanism) {
+        public Builder(SaslHandshakeRequestData data) {
             super(ApiKeys.SASL_HANDSHAKE);
-            this.mechanism = mechanism;
+            this.data = data;
         }
 
         @Override
         public SaslHandshakeRequest build(short version) {
-            return new SaslHandshakeRequest(mechanism, version);
+            return new SaslHandshakeRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=SaslHandshakeRequest").
-                append(", mechanism=").append(mechanism).
-                append(")");
-            return bld.toString();
+            return data.toString();
         }
     }
 
-    public SaslHandshakeRequest(String mechanism) {
-        this(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion());
+    private final SaslHandshakeRequestData data;
+    private final short version;
+
+    public SaslHandshakeRequest(SaslHandshakeRequestData data) {
+        this(data, ApiKeys.SASL_HANDSHAKE.latestVersion());
     }
 
-    public SaslHandshakeRequest(String mechanism, short version) {
+    public SaslHandshakeRequest(SaslHandshakeRequestData data, short version) {
         super(ApiKeys.SASL_HANDSHAKE, version);
-        this.mechanism = mechanism;
+        this.data = data;
+        this.version = version;
     }
 
     public SaslHandshakeRequest(Struct struct, short version) {
         super(ApiKeys.SASL_HANDSHAKE, version);
-        mechanism = struct.getString(MECHANISM_KEY_NAME);
+        this.data = new SaslHandshakeRequestData(struct, version);
+        this.version = version;
     }
 
-    public String mechanism() {
-        return mechanism;
+    public SaslHandshakeRequestData data() {
+        return data;
     }
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-            case 1:
-                List<String> enabledMechanisms = Collections.emptyList();
-                return new SaslHandshakeResponse(Errors.forException(e), 
enabledMechanisms);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), 
ApiKeys.SASL_HANDSHAKE.latestVersion()));
-        }
+        SaslHandshakeResponseData response = new SaslHandshakeResponseData();
+        response.setErrorCode(ApiError.fromThrowable(e).error().code());
+        return new SaslHandshakeResponse(response);
     }
 
     public static SaslHandshakeRequest parse(ByteBuffer buffer, short version) 
{
@@ -113,9 +90,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new 
Struct(ApiKeys.SASL_HANDSHAKE.requestSchema(version()));
-        struct.set(MECHANISM_KEY_NAME, mechanism);
-        return struct;
+        return data.toStruct(version);
     }
 }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index 9faa36c..939c5e8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -16,84 +16,56 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-
-
 /**
  * Response from SASL server which indicates if the client-chosen mechanism is 
enabled in the server.
  * For error responses, the list of enabled mechanisms is included in the 
response.
  */
 public class SaslHandshakeResponse extends AbstractResponse {
-    private static final String ENABLED_MECHANISMS_KEY_NAME = 
"enabled_mechanisms";
-
-    private static final Schema SASL_HANDSHAKE_RESPONSE_V0 = new Schema(
-            ERROR_CODE,
-            new Field(ENABLED_MECHANISMS_KEY_NAME, new ArrayOf(Type.STRING), 
"Array of mechanisms enabled in the server."));
-
-    private static final Schema SASL_HANDSHAKE_RESPONSE_V1 = 
SASL_HANDSHAKE_RESPONSE_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{SASL_HANDSHAKE_RESPONSE_V0, 
SASL_HANDSHAKE_RESPONSE_V1};
-    }
 
-    /**
-     * Possible error codes:
-     *   UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server
-     *   ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake
-     */
-    private final Errors error;
-    private final List<String> enabledMechanisms;
+    private final SaslHandshakeResponseData data;
 
-    public SaslHandshakeResponse(Errors error, Collection<String> 
enabledMechanisms) {
-        this.error = error;
-        this.enabledMechanisms = new ArrayList<>(enabledMechanisms);
+    public SaslHandshakeResponse(SaslHandshakeResponseData data) {
+        this.data = data;
     }
 
-    public SaslHandshakeResponse(Struct struct) {
-        error = Errors.forCode(struct.get(ERROR_CODE));
-        Object[] mechanisms = struct.getArray(ENABLED_MECHANISMS_KEY_NAME);
-        ArrayList<String> enabledMechanisms = new ArrayList<>();
-        for (Object mechanism : mechanisms)
-            enabledMechanisms.add((String) mechanism);
-        this.enabledMechanisms = enabledMechanisms;
+    public SaslHandshakeResponse(Struct struct, short version) {
+        this.data = new SaslHandshakeResponseData(struct, version);
     }
 
+    /*
+    * Possible error codes:
+    *   UNSUPPORTED_SASL_MECHANISM(33): Client mechanism not enabled in server
+    *   ILLEGAL_SASL_STATE(34) : Invalid request during SASL handshake
+    */
     public Errors error() {
-        return error;
+        return Errors.forCode(data.errorCode());
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(error);
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
     }
 
     @Override
     public Struct toStruct(short version) {
-        Struct struct = new 
Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version));
-        struct.set(ERROR_CODE, error.code());
-        struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 
     public List<String> enabledMechanisms() {
-        return enabledMechanisms;
+        return data.mechanisms();
     }
 
     public static SaslHandshakeResponse parse(ByteBuffer buffer, short 
version) {
-        return new 
SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer));
+        return new 
SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer), 
version);
     }
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 78428ee..cc26336 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.ReauthenticationContext;
@@ -328,7 +329,8 @@ public class SaslClientAuthenticator implements 
Authenticator {
 
     // Visible to override for testing
     protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
-        return new SaslHandshakeRequest.Builder(mechanism).build(version);
+        return new SaslHandshakeRequest.Builder(
+                new 
SaslHandshakeRequestData().setMechanism(mechanism)).build(version);
     }
 
     // Visible to override for testing
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 1e62e7f..ccd94fc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.network.ListenerName;
@@ -35,7 +36,6 @@ import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
@@ -50,6 +50,7 @@ import 
org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.kerberos.KerberosError;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
@@ -77,12 +78,12 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
 public class SaslServerAuthenticator implements Authenticator {
     // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL 
mechanisms
@@ -118,7 +119,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
     private final String connectionId;
     private final Map<String, Subject> subjects;
     private final TransportLayer transportLayer;
-    private final Set<String> enabledMechanisms;
+    private final List<String> enabledMechanisms;
     private final Map<String, ?> configs;
     private final KafkaPrincipalBuilder principalBuilder;
     private final Map<String, AuthenticateCallbackHandler> callbackHandlers;
@@ -168,8 +169,8 @@ public class SaslServerAuthenticator implements 
Authenticator {
         List<String> enabledMechanisms = (List<String>) 
this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
         if (enabledMechanisms == null || enabledMechanisms.isEmpty())
             throw new IllegalArgumentException("No SASL mechanisms are 
enabled");
-        this.enabledMechanisms = new HashSet<>(enabledMechanisms);
-        for (String mechanism : enabledMechanisms) {
+        this.enabledMechanisms = new ArrayList<String>(new 
HashSet<String>(enabledMechanisms));
+        for (String mechanism : this.enabledMechanisms) {
             if (!callbackHandlers.containsKey(mechanism))
                 throw new IllegalArgumentException("Callback handler not 
specified for SASL mechanism " + mechanism);
             if (!subjects.containsKey(mechanism))
@@ -538,17 +539,19 @@ public class SaslServerAuthenticator implements 
Authenticator {
     }
 
     private String handleHandshakeRequest(RequestContext context, 
SaslHandshakeRequest handshakeRequest) throws IOException, 
UnsupportedSaslMechanismException {
-        String clientMechanism = handshakeRequest.mechanism();
+        String clientMechanism = handshakeRequest.data().mechanism();
         short version = context.header.apiVersion();
         if (version >= 1)
             this.enableKafkaSaslAuthenticateHeaders(true);
         if (enabledMechanisms.contains(clientMechanism)) {
             LOG.debug("Using SASL mechanism '{}' provided by client", 
clientMechanism);
-            sendKafkaResponse(context, new SaslHandshakeResponse(Errors.NONE, 
enabledMechanisms));
+            sendKafkaResponse(context, new SaslHandshakeResponse(
+                    new 
SaslHandshakeResponseData().setErrorCode(Errors.NONE.code()).setMechanisms(enabledMechanisms)));
             return clientMechanism;
         } else {
             LOG.debug("SASL mechanism '{}' requested by client is not 
supported", clientMechanism);
-            buildResponseOnAuthenticateFailure(context, new 
SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
+            buildResponseOnAuthenticateFailure(context, new 
SaslHandshakeResponse(
+                    new 
SaslHandshakeResponseData().setErrorCode(Errors.UNSUPPORTED_SASL_MECHANISM.code()).setMechanisms(enabledMechanisms)));
             throw new UnsupportedSaslMechanismException("Unsupported SASL 
mechanism " + clientMechanism);
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 953b2bd..c105d3b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -45,6 +45,8 @@ import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.Partiti
 import 
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.SaslHandshakeRequestData;
+import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -1011,11 +1013,14 @@ public class RequestResponseTest {
     }
 
     private SaslHandshakeRequest createSaslHandshakeRequest() {
-        return new SaslHandshakeRequest("PLAIN");
+        return new SaslHandshakeRequest.Builder(
+                new SaslHandshakeRequestData().setMechanism("PLAIN")).build();
     }
 
     private SaslHandshakeResponse createSaslHandshakeResponse() {
-        return new SaslHandshakeResponse(Errors.NONE, singletonList("GSSAPI"));
+        return new SaslHandshakeResponse(
+                new SaslHandshakeResponseData()
+                
.setErrorCode(Errors.NONE.code()).setMechanisms(singletonList("GSSAPI")));
     }
 
     private SaslAuthenticateRequest createSaslAuthenticateRequest() {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c139062..97d114f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.network.CertStores;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.ChannelBuilders;
@@ -708,8 +709,9 @@ public class SaslAuthenticatorTest {
         // Send SaslHandshakeRequest and validate that connection is closed by 
server.
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
-        SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
+        SaslHandshakeRequest request = buildSaslHandshakeRequest("PLAIN", 
ApiKeys.SASL_HANDSHAKE.latestVersion());
         RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, 
Short.MAX_VALUE, "someclient", 2);
+        
         selector.send(request.toSend(node1, header));
         // This test uses a non-SASL PLAINTEXT client in order to do manual 
handshake.
         // So the channel is in READY state.
@@ -1715,7 +1717,7 @@ public class SaslAuthenticatorTest {
                         servicePrincipal, serverHost, saslMechanism, true, 
transportLayer, time) {
                     @Override
                     protected SaslHandshakeRequest 
createSaslHandshakeRequest(short version) {
-                        return new 
SaslHandshakeRequest.Builder(saslMechanism).build((short) 0);
+                        return buildSaslHandshakeRequest(saslMechanism, 
(short) 0);
                     }
                     @Override
                     protected void saslAuthenticateVersion(ApiVersionsResponse 
apiVersionsResponse) {
@@ -1927,7 +1929,7 @@ public class SaslAuthenticatorTest {
     }
 
     private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String 
node, short version) throws Exception {
-        SaslHandshakeRequest handshakeRequest = new 
SaslHandshakeRequest.Builder("PLAIN").build(version);
+        SaslHandshakeRequest handshakeRequest = 
buildSaslHandshakeRequest("PLAIN", version);
         SaslHandshakeResponse response = (SaslHandshakeResponse) 
sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, handshakeRequest);
         assertEquals(Errors.NONE, response.error());
         return response;
@@ -1971,6 +1973,11 @@ public class SaslAuthenticatorTest {
         }
     }
 
+    private SaslHandshakeRequest buildSaslHandshakeRequest(String mechanism, 
short version) {
+        return new SaslHandshakeRequest.Builder(
+                new 
SaslHandshakeRequestData().setMechanism(mechanism)).build(version);
+    }
+
     @SuppressWarnings("unchecked")
     private void updateScramCredentialCache(String username, String password) 
throws NoSuchAlgorithmException {
         for (String mechanism : (List<String>) 
saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) {
@@ -2228,7 +2235,8 @@ public class SaslAuthenticatorTest {
                         "PLAIN", true, transportLayer, time) {
                     @Override
                     protected SaslHandshakeRequest 
createSaslHandshakeRequest(short version) {
-                        return new 
SaslHandshakeRequest.Builder("PLAIN").build(version);
+                        return new SaslHandshakeRequest.Builder(
+                                new 
SaslHandshakeRequestData().setMechanism("PLAIN")).build(version);
                     }
                 };
         }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c37edf6..bdd794c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -48,6 +48,7 @@ import 
org.apache.kafka.common.message.CreateTopicsResponseData
 import 
org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, 
CreatableTopicResultSet}
 import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
 import org.apache.kafka.common.message.LeaveGroupResponseData
+import org.apache.kafka.common.message.SaslHandshakeResponseData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -1359,7 +1360,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleSaslHandshakeRequest(request: RequestChannel.Request) {
-    sendResponseMaybeThrottle(request, _ => new 
SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet()))
+    val responseData = new 
SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code())
+    sendResponseMaybeThrottle(request, _ => new 
SaslHandshakeResponse(responseData))
   }
 
   def handleSaslAuthenticateRequest(request: RequestChannel.Request) {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index b5531fe..2d82e7f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -29,6 +29,7 @@ import org.apache.kafka.common.resource.{PatternType, 
ResourcePattern, ResourceP
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
CreatableTopicSet}
+import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
@@ -273,7 +274,7 @@ class RequestQuotaTest extends BaseRequestTest {
           new ListGroupsRequest.Builder()
 
         case ApiKeys.SASL_HANDSHAKE =>
-          new SaslHandshakeRequest.Builder("PLAIN")
+          new SaslHandshakeRequest.Builder(new 
SaslHandshakeRequestData().setMechanism("PLAIN"))
 
         case ApiKeys.SASL_AUTHENTICATE =>
           new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new 
Array[Byte](0)))
diff --git 
a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
index 00b9934..185a2f4 100644
--- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala
@@ -19,6 +19,7 @@ package kafka.server
 import java.net.Socket
 import java.util.Collections
 
+import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse}
 import org.apache.kafka.common.requests.SaslHandshakeRequest
@@ -95,7 +96,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with 
SaslSetup {
   }
 
   private def sendSaslHandshakeRequestValidateResponse(socket: Socket) {
-    val request = new SaslHandshakeRequest("PLAIN")
+    val request = new SaslHandshakeRequest(new 
SaslHandshakeRequestData().setMechanism("PLAIN"))
     val response = sendAndReceive(request, ApiKeys.SASL_HANDSHAKE, socket)
     val handshakeResponse = SaslHandshakeResponse.parse(response, 
request.version)
     assertEquals(Errors.NONE, handshakeResponse.error)

Reply via email to