This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0d56f14 KAFKA-7997: Use automatic RPC generation in SaslAuthenticate
0d56f14 is described below
commit 0d56f1413557adabc736cae2dffcdc56a620403e
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Mar 2 21:28:30 2019 +0530
KAFKA-7997: Use automatic RPC generation in SaslAuthenticate
Author: Mickael Maison <[email protected]>
Reviewers: Manikumar Reddy <[email protected]>
Closes #6324 from mimaison/sasl-authenticate
---
.../org/apache/kafka/common/protocol/ApiKeys.java | 8 +-
.../kafka/common/requests/AbstractResponse.java | 2 +-
.../common/requests/SaslAuthenticateRequest.java | 63 ++++++----------
.../common/requests/SaslAuthenticateResponse.java | 86 +++++-----------------
.../authenticator/SaslClientAuthenticator.java | 7 +-
.../authenticator/SaslServerAuthenticator.java | 24 ++++--
.../java/org/apache/kafka/common/utils/Utils.java | 9 +++
.../kafka/common/requests/RequestResponseTest.java | 11 ++-
.../authenticator/SaslAuthenticatorTest.java | 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 9 ++-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 3 +-
11 files changed, 98 insertions(+), 128 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 19bf6f0..0a19939 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -24,6 +24,8 @@ import
org.apache.kafka.common.message.ElectPreferredLeadersRequestData;
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.protocol.types.Schema;
@@ -97,8 +99,6 @@ import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
-import org.apache.kafka.common.requests.SaslAuthenticateRequest;
-import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.StopReplicaRequest;
import org.apache.kafka.common.requests.StopReplicaResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
@@ -180,8 +180,8 @@ public enum ApiKeys {
AlterReplicaLogDirsResponse.schemaVersions()),
DESCRIBE_LOG_DIRS(35, "DescribeLogDirs",
DescribeLogDirsRequest.schemaVersions(),
DescribeLogDirsResponse.schemaVersions()),
- SASL_AUTHENTICATE(36, "SaslAuthenticate",
SaslAuthenticateRequest.schemaVersions(),
- SaslAuthenticateResponse.schemaVersions()),
+ SASL_AUTHENTICATE(36, "SaslAuthenticate",
SaslAuthenticateRequestData.SCHEMAS,
+ SaslAuthenticateResponseData.SCHEMAS),
CREATE_PARTITIONS(37, "CreatePartitions",
CreatePartitionsRequest.schemaVersions(),
CreatePartitionsResponse.schemaVersions()),
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken",
CreateDelegationTokenRequest.schemaVersions(),
CreateDelegationTokenResponse.schemaVersions()),
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 959379c..712d732 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -143,7 +143,7 @@ public abstract class AbstractResponse extends
AbstractRequestResponse {
case DESCRIBE_LOG_DIRS:
return new DescribeLogDirsResponse(struct);
case SASL_AUTHENTICATE:
- return new SaslAuthenticateResponse(struct);
+ return new SaslAuthenticateResponse(struct, version);
case CREATE_PARTITIONS:
return new CreatePartitionsResponse(struct);
case CREATE_DELEGATION_TOKEN:
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
index b6da7ea..d136e78 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -16,15 +16,13 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
/**
* Request from SASL client containing client SASL authentication token as
defined by the
@@ -35,31 +33,18 @@ import static
org.apache.kafka.common.protocol.types.Type.BYTES;
* brokers will send SaslHandshake request v0 followed by SASL tokens without
the Kafka request headers.
*/
public class SaslAuthenticateRequest extends AbstractRequest {
- private static final String SASL_AUTH_BYTES_KEY_NAME = "sasl_auth_bytes";
-
- private static final Schema SASL_AUTHENTICATE_REQUEST_V0 = new Schema(
- new Field(SASL_AUTH_BYTES_KEY_NAME, BYTES, "SASL authentication
bytes from client as defined by the SASL mechanism."));
-
- /* v1 request is the same as v0; session_lifetime_ms has been added to the
response */
- private static final Schema SASL_AUTHENTICATE_REQUEST_V1 =
SASL_AUTHENTICATE_REQUEST_V0;
-
- public static Schema[] schemaVersions() {
- return new Schema[]{SASL_AUTHENTICATE_REQUEST_V0,
SASL_AUTHENTICATE_REQUEST_V1};
- }
-
- private final ByteBuffer saslAuthBytes;
public static class Builder extends
AbstractRequest.Builder<SaslAuthenticateRequest> {
- private final ByteBuffer saslAuthBytes;
+ private final SaslAuthenticateRequestData data;
- public Builder(ByteBuffer saslAuthBytes) {
+ public Builder(SaslAuthenticateRequestData data) {
super(ApiKeys.SASL_AUTHENTICATE);
- this.saslAuthBytes = saslAuthBytes;
+ this.data = data;
}
@Override
public SaslAuthenticateRequest build(short version) {
- return new SaslAuthenticateRequest(saslAuthBytes, version);
+ return new SaslAuthenticateRequest(data, version);
}
@Override
@@ -70,35 +55,35 @@ public class SaslAuthenticateRequest extends
AbstractRequest {
}
}
- public SaslAuthenticateRequest(ByteBuffer saslAuthBytes) {
- this(saslAuthBytes, ApiKeys.SASL_AUTHENTICATE.latestVersion());
+ private final SaslAuthenticateRequestData data;
+ private final short version;
+
+ public SaslAuthenticateRequest(SaslAuthenticateRequestData data) {
+ this(data, ApiKeys.SASL_AUTHENTICATE.latestVersion());
}
- public SaslAuthenticateRequest(ByteBuffer saslAuthBytes, short version) {
+ public SaslAuthenticateRequest(SaslAuthenticateRequestData data, short
version) {
super(ApiKeys.SASL_AUTHENTICATE, version);
- this.saslAuthBytes = saslAuthBytes;
+ this.data = data;
+ this.version = version;
}
public SaslAuthenticateRequest(Struct struct, short version) {
super(ApiKeys.SASL_AUTHENTICATE, version);
- saslAuthBytes = struct.getBytes(SASL_AUTH_BYTES_KEY_NAME);
+ this.data = new SaslAuthenticateRequestData(struct, version);
+ this.version = version;
}
- public ByteBuffer saslAuthBytes() {
- return saslAuthBytes;
+ public SaslAuthenticateRequestData data() {
+ return data;
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- short versionId = version();
- switch (versionId) {
- case 0:
- case 1:
- return new SaslAuthenticateResponse(Errors.forException(e),
e.getMessage());
- default:
- throw new IllegalArgumentException(String.format("Version %d
is not valid. Valid versions for %s are 0 to %d",
- versionId, this.getClass().getSimpleName(),
ApiKeys.SASL_AUTHENTICATE.latestVersion()));
- }
+ SaslAuthenticateResponseData response = new
SaslAuthenticateResponseData()
+ .setErrorCode(ApiError.fromThrowable(e).error().code())
+ .setErrorMessage(e.getMessage());
+ return new SaslAuthenticateResponse(response);
}
public static SaslAuthenticateRequest parse(ByteBuffer buffer, short
version) {
@@ -107,9 +92,7 @@ public class SaslAuthenticateRequest extends AbstractRequest
{
@Override
protected Struct toStruct() {
- Struct struct = new
Struct(ApiKeys.SASL_AUTHENTICATE.requestSchema(version()));
- struct.set(SASL_AUTH_BYTES_KEY_NAME, saslAuthBytes);
- return struct;
+ return data.toStruct(version);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index 402d04d..fc8832a 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -16,111 +16,63 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.types.Type.BYTES;
-import static org.apache.kafka.common.protocol.types.Type.INT64;
-
/**
* Response from SASL server which for a SASL challenge as defined by the SASL
protocol
* for the mechanism configured for the client.
*/
public class SaslAuthenticateResponse extends AbstractResponse {
- private static final String SASL_AUTH_BYTES_KEY_NAME = "sasl_auth_bytes";
- private static final String SESSION_LIFETIME_MS = "session_lifetime_ms";
-
- private static final Schema SASL_AUTHENTICATE_RESPONSE_V0 = new Schema(
- ERROR_CODE,
- ERROR_MESSAGE,
- new Field(SASL_AUTH_BYTES_KEY_NAME, BYTES, "SASL authentication
bytes from server as defined by the SASL mechanism."));
- private static final Schema SASL_AUTHENTICATE_RESPONSE_V1 = new Schema(
- ERROR_CODE,
- ERROR_MESSAGE,
- new Field(SASL_AUTH_BYTES_KEY_NAME, BYTES, "SASL authentication
bytes from server as defined by the SASL mechanism."),
- new Field(SESSION_LIFETIME_MS, INT64, "Number of milliseconds
after which only re-authentication over the existing connection to create a new
session can occur."));
+ private final SaslAuthenticateResponseData data;
- public static Schema[] schemaVersions() {
- return new Schema[]{SASL_AUTHENTICATE_RESPONSE_V0,
SASL_AUTHENTICATE_RESPONSE_V1};
+ public SaslAuthenticateResponse(SaslAuthenticateResponseData data) {
+ this.data = data;
}
- private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
-
- private final ByteBuffer saslAuthBytes;
+ public SaslAuthenticateResponse(Struct struct, short version) {
+ this.data = new SaslAuthenticateResponseData(struct, version);
+ }
/**
* Possible error codes:
* SASL_AUTHENTICATION_FAILED(57) : Authentication failed
*/
- private final Errors error;
- private final String errorMessage;
- private final long sessionLifetimeMs;
-
- public SaslAuthenticateResponse(Errors error, String errorMessage) {
- this(error, errorMessage, EMPTY_BUFFER);
- }
-
- public SaslAuthenticateResponse(Errors error, String errorMessage,
ByteBuffer saslAuthBytes) {
- this(error, errorMessage, saslAuthBytes, 0L);
- }
-
- public SaslAuthenticateResponse(Errors error, String errorMessage,
ByteBuffer saslAuthBytes, long sessionLifetimeMs) {
- this.error = error;
- this.errorMessage = errorMessage;
- this.saslAuthBytes = saslAuthBytes;
- this.sessionLifetimeMs = sessionLifetimeMs;
- }
-
- public SaslAuthenticateResponse(Struct struct) {
- error = Errors.forCode(struct.get(ERROR_CODE));
- errorMessage = struct.get(ERROR_MESSAGE);
- saslAuthBytes = struct.getBytes(SASL_AUTH_BYTES_KEY_NAME);
- sessionLifetimeMs = struct.hasField(SESSION_LIFETIME_MS) ?
struct.getLong(SESSION_LIFETIME_MS).longValue() : 0L;
- }
-
public Errors error() {
- return error;
+ return Errors.forCode(data.errorCode());
}
- public String errorMessage() {
- return errorMessage;
+ @Override
+ public Map<Errors, Integer> errorCounts() {
+ return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}
- public ByteBuffer saslAuthBytes() {
- return saslAuthBytes;
+ public String errorMessage() {
+ return data.errorMessage();
}
public long sessionLifetimeMs() {
- return sessionLifetimeMs;
+ return data.sessionLifetimeMs();
}
- @Override
- public Map<Errors, Integer> errorCounts() {
- return errorCounts(error);
+ public byte[] saslAuthBytes() {
+ return data.authBytes();
}
@Override
public Struct toStruct(short version) {
- Struct struct = new
Struct(ApiKeys.SASL_AUTHENTICATE.responseSchema(version));
- struct.set(ERROR_CODE, error.code());
- struct.set(ERROR_MESSAGE, errorMessage);
- struct.set(SASL_AUTH_BYTES_KEY_NAME, saslAuthBytes);
- if (version > 0)
- struct.set(SESSION_LIFETIME_MS, sessionLifetimeMs);
- return struct;
+ return data.toStruct(version);
}
public static SaslAuthenticateResponse parse(ByteBuffer buffer, short
version) {
- return new
SaslAuthenticateResponse(ApiKeys.SASL_AUTHENTICATE.parseResponse(version,
buffer));
+ return new
SaslAuthenticateResponse(ApiKeys.SASL_AUTHENTICATE.parseResponse(version,
buffer), version);
}
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index cc26336..3133f44 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.NetworkSend;
@@ -373,7 +374,9 @@ public class SaslClientAuthenticator implements
Authenticator {
if (saslToken != null) {
ByteBuffer tokenBuf = ByteBuffer.wrap(saslToken);
if (saslAuthenticateVersion !=
DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER) {
- SaslAuthenticateRequest request = new
SaslAuthenticateRequest.Builder(tokenBuf).build(saslAuthenticateVersion);
+ SaslAuthenticateRequestData data = new
SaslAuthenticateRequestData()
+ .setAuthBytes(tokenBuf.array());
+ SaslAuthenticateRequest request = new
SaslAuthenticateRequest.Builder(data).build(saslAuthenticateVersion);
tokenBuf =
request.serialize(nextRequestHeader(ApiKeys.SASL_AUTHENTICATE,
saslAuthenticateVersion));
}
send(new NetworkSend(node, tokenBuf));
@@ -445,7 +448,7 @@ public class SaslClientAuthenticator implements
Authenticator {
long sessionLifetimeMs = response.sessionLifetimeMs();
if (sessionLifetimeMs > 0L)
reauthInfo.positiveSessionLifetimeMs = sessionLifetimeMs;
- return Utils.readBytes(response.saslAuthBytes());
+ return Utils.copyArray(response.saslAuthBytes());
} else
return null;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index ccd94fc..7aca177 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.ChannelBuilders;
@@ -89,7 +90,6 @@ public class SaslServerAuthenticator implements Authenticator
{
// GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL
mechanisms
static final int MAX_RECEIVE_SIZE = 524288;
private static final Logger LOG =
LoggerFactory.getLogger(SaslServerAuthenticator.class);
- private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
/**
* The internal state transitions for initial authentication of a channel
on the
@@ -448,17 +448,25 @@ public class SaslServerAuthenticator implements
Authenticator {
SaslAuthenticateRequest saslAuthenticateRequest =
(SaslAuthenticateRequest) requestAndSize.request;
try {
- byte[] responseToken =
saslServer.evaluateResponse(Utils.readBytes(saslAuthenticateRequest.saslAuthBytes()));
+ byte[] responseToken = saslServer.evaluateResponse(
+
Utils.copyArray(saslAuthenticateRequest.data().authBytes()));
if (reauthInfo.reauthenticating() && saslServer.isComplete())
reauthInfo.ensurePrincipalUnchanged(principal());
// For versions with SASL_AUTHENTICATE header, send a response
to SASL_AUTHENTICATE request even if token is empty.
- ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER
: ByteBuffer.wrap(responseToken);
+ byte[] responseBytes = responseToken == null ? new byte[0] :
responseToken;
long sessionLifetimeMs = !saslServer.isComplete() ? 0L
:
reauthInfo.calcCompletionTimesAndReturnSessionLifetimeMs();
- sendKafkaResponse(requestContext, new
SaslAuthenticateResponse(Errors.NONE, null, responseBuf, sessionLifetimeMs));
+ sendKafkaResponse(requestContext, new SaslAuthenticateResponse(
+ new SaslAuthenticateResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setAuthBytes(responseBytes)
+ .setSessionLifetimeMs(sessionLifetimeMs)));
} catch (SaslAuthenticationException e) {
buildResponseOnAuthenticateFailure(requestContext,
- new
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
+ new SaslAuthenticateResponse(
+ new SaslAuthenticateResponseData()
+
.setErrorCode(Errors.SASL_AUTHENTICATION_FAILED.code())
+ .setErrorMessage(e.getMessage())));
throw e;
} catch (SaslException e) {
KerberosError kerberosError = KerberosError.fromException(e);
@@ -471,8 +479,10 @@ public class SaslServerAuthenticator implements
Authenticator {
String errorMessage = "Authentication failed during "
+ reauthInfo.authenticationOrReauthenticationText()
+ " due to invalid credentials with SASL mechanism
" + saslMechanism;
- sendKafkaResponse(requestContext, new
SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED,
- errorMessage));
+ sendKafkaResponse(requestContext, new
SaslAuthenticateResponse(
+ new SaslAuthenticateResponseData()
+
.setErrorCode(Errors.SASL_AUTHENTICATION_FAILED.code())
+ .setErrorMessage(errorMessage)));
throw new SaslAuthenticationException(errorMessage, e);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 53417d2..5d2a5cf 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -275,6 +275,15 @@ public final class Utils {
}
/**
+ * Returns a copy of src byte array
+ * @param src The byte array to copy
+ * @return The copy
+ */
+ public static byte[] copyArray(byte[] src) {
+ return Arrays.copyOf(src, src.length);
+ }
+
+ /**
* Check that the parameter t is not null
*
* @param t The object to check
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f1e2063..5d60086 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -47,6 +47,8 @@ import
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.Partiti
import
org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.network.ListenerName;
@@ -1029,11 +1031,16 @@ public class RequestResponseTest {
}
private SaslAuthenticateRequest createSaslAuthenticateRequest() {
- return new SaslAuthenticateRequest(ByteBuffer.wrap(new byte[0]));
+ SaslAuthenticateRequestData data = new
SaslAuthenticateRequestData().setAuthBytes(new byte[0]);
+ return new SaslAuthenticateRequest(data);
}
private SaslAuthenticateResponse createSaslAuthenticateResponse() {
- return new SaslAuthenticateResponse(Errors.NONE, null,
ByteBuffer.wrap(new byte[0]), Long.MAX_VALUE);
+ SaslAuthenticateResponseData data = new SaslAuthenticateResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setAuthBytes(new byte[0])
+ .setSessionLifetimeMs(Long.MAX_VALUE);
+ return new SaslAuthenticateResponse(data);
}
private ApiVersionsRequest createApiVersionRequest() {
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 97d114f..5cfecf8 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
@@ -1796,7 +1797,8 @@ public class SaslAuthenticatorTest {
String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" +
TestJaasConfig.PASSWORD;
ByteBuffer authBuf = ByteBuffer.wrap(authString.getBytes("UTF-8"));
if (enableSaslAuthenticateHeader) {
- SaslAuthenticateRequest request = new
SaslAuthenticateRequest.Builder(authBuf).build();
+ SaslAuthenticateRequestData data = new
SaslAuthenticateRequestData().setAuthBytes(authBuf.array());
+ SaslAuthenticateRequest request = new
SaslAuthenticateRequest.Builder(data).build();
sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_AUTHENTICATE,
request);
} else {
selector.send(new NetworkSend(node, authBuf));
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2361ee5..faf338e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -49,6 +49,7 @@ import
org.apache.kafka.common.message.{CreateTopicsResponseData, DescribeGroups
import
org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult,
CreatableTopicResultSet}
import org.apache.kafka.common.message.ElectPreferredLeadersResponseData
import org.apache.kafka.common.message.LeaveGroupResponseData
+import org.apache.kafka.common.message.SaslAuthenticateResponseData
import org.apache.kafka.common.message.SaslHandshakeResponseData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
@@ -1402,13 +1403,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleSaslHandshakeRequest(request: RequestChannel.Request) {
- val responseData = new
SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code())
+ val responseData = new
SaslHandshakeResponseData().setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
sendResponseMaybeThrottle(request, _ => new
SaslHandshakeResponse(responseData))
}
def handleSaslAuthenticateRequest(request: RequestChannel.Request) {
- sendResponseMaybeThrottle(request, _ => new
SaslAuthenticateResponse(Errors.ILLEGAL_SASL_STATE,
- "SaslAuthenticate request received after successful authentication"))
+ val responseData = new SaslAuthenticateResponseData()
+ .setErrorCode(Errors.ILLEGAL_SASL_STATE.code)
+ .setErrorMessage("SaslAuthenticate request received after successful
authentication")
+ sendResponseMaybeThrottle(request, _ => new
SaslAuthenticateResponse(responseData))
}
def handleApiVersionsRequest(request: RequestChannel.Request) {
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index b0eb7cb..1c8656d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -28,6 +28,7 @@ import
org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsR
import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourcePatternFilter, ResourceType => AdminResourceType}
import org.apache.kafka.common.{Node, TopicPartition}
import
org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic,
CreatableTopicSet}
+import org.apache.kafka.common.message.SaslAuthenticateRequestData
import org.apache.kafka.common.message.SaslHandshakeRequestData
import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
import org.apache.kafka.common.network.ListenerName
@@ -276,7 +277,7 @@ class RequestQuotaTest extends BaseRequestTest {
new SaslHandshakeRequest.Builder(new
SaslHandshakeRequestData().setMechanism("PLAIN"))
case ApiKeys.SASL_AUTHENTICATE =>
- new SaslAuthenticateRequest.Builder(ByteBuffer.wrap(new
Array[Byte](0)))
+ new SaslAuthenticateRequest.Builder(new
SaslAuthenticateRequestData().setAuthBytes(new Array[Byte](0)))
case ApiKeys.API_VERSIONS =>
new ApiVersionsRequest.Builder