This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new c6cbdf2 KAFKA-9577; SaslClientAuthenticator incorrectly negotiates
SASL_HANDSHAKE version (#8142)
c6cbdf2 is described below
commit c6cbdf2be85088d043e40c794000dcdbdac008d9
Author: Lucas Bradstreet <[email protected]>
AuthorDate: Fri Feb 21 21:49:11 2020 -0800
KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE
version (#8142)
The SaslClientAuthenticator incorrectly negotiates supported
SaslHandshakeRequest version and uses the maximum version supported by the
broker whether or not the client supports it.
This bug was exposed by a recent version bump in
https://github.com/apache/kafka/commit/0a2569e2b9907a1217dd50ccbc320f8ad0b42fd0.
This PR rolls back the recent SaslHandshake[Request,Response] bump, fixes
the version negotiation, and adds a test to prevent anyone from accidentally
bumping the version without a workaround such as a new ApiKey. The existing key
will be difficult to support for clients < 2.5 due to the incorrect negotiation.
Reviewers: Ron Dagostino <[email protected]>, Rajini Sivaram
<[email protected]>, Colin P. McCabe <[email protected]>, Jason
Gustafson <[email protected]>
---
.../authenticator/SaslClientAuthenticator.java | 25 +++++---
.../common/message/SaslHandshakeRequest.json | 7 +-
.../common/message/SaslHandshakeResponse.json | 7 +-
.../authenticator/SaslAuthenticatorTest.java | 74 +++++++++++++++++++++-
4 files changed, 96 insertions(+), 17 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 6784a01..e972da1 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -133,6 +133,8 @@ public class SaslClientAuthenticator implements
Authenticator {
private RequestHeader currentRequestHeader;
// Version of SaslAuthenticate request/responses
private short saslAuthenticateVersion;
+ // Version of SaslHandshake request/responses
+ private short saslHandshakeVersion;
public SaslClientAuthenticator(Map<String, ?> configs,
AuthenticateCallbackHandler callbackHandler,
@@ -213,13 +215,13 @@ public class SaslClientAuthenticator implements
Authenticator {
if (apiVersionsResponse == null)
break;
else {
- saslAuthenticateVersion(apiVersionsResponse);
+
setSaslAuthenticateAndHandshakeVersions(apiVersionsResponse);
reauthInfo.apiVersionsResponseReceivedFromBroker =
apiVersionsResponse;
setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
// Fall through to send handshake request with the latest
supported version
}
case SEND_HANDSHAKE_REQUEST:
-
sendHandshakeRequest(reauthInfo.apiVersionsResponseReceivedFromBroker);
+ sendHandshakeRequest(saslHandshakeVersion);
setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
break;
case RECEIVE_HANDSHAKE_RESPONSE:
@@ -236,11 +238,11 @@ public class SaslClientAuthenticator implements
Authenticator {
setSaslState(SaslState.INTERMEDIATE);
break;
case REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE:
-
saslAuthenticateVersion(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
+
setSaslAuthenticateAndHandshakeVersions(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
setSaslState(SaslState.REAUTH_SEND_HANDSHAKE_REQUEST); // Will
set immediately
// Fall through to send handshake request with the latest
supported version
case REAUTH_SEND_HANDSHAKE_REQUEST:
-
sendHandshakeRequest(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
+ sendHandshakeRequest(saslHandshakeVersion);
setSaslState(SaslState.REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE);
break;
case REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE:
@@ -285,9 +287,8 @@ public class SaslClientAuthenticator implements
Authenticator {
}
}
- private void sendHandshakeRequest(ApiVersionsResponse apiVersionsResponse)
throws IOException {
- SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(
-
apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion());
+ private void sendHandshakeRequest(short version) throws IOException {
+ SaslHandshakeRequest handshakeRequest =
createSaslHandshakeRequest(version);
send(handshakeRequest.toSend(node,
nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version())));
}
@@ -345,11 +346,17 @@ public class SaslClientAuthenticator implements
Authenticator {
}
// Visible to override for testing
- protected void saslAuthenticateVersion(ApiVersionsResponse
apiVersionsResponse) {
+ protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse
apiVersionsResponse) {
ApiVersionsResponseKey authenticateVersion =
apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id);
- if (authenticateVersion != null)
+ if (authenticateVersion != null) {
this.saslAuthenticateVersion = (short)
Math.min(authenticateVersion.maxVersion(),
ApiKeys.SASL_AUTHENTICATE.latestVersion());
+ }
+ ApiVersionsResponseKey handshakeVersion =
apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id);
+ if (handshakeVersion != null) {
+ this.saslHandshakeVersion = (short)
Math.min(handshakeVersion.maxVersion(),
+ ApiKeys.SASL_HANDSHAKE.latestVersion());
+ }
}
private void setSaslState(SaslState saslState) {
diff --git
a/clients/src/main/resources/common/message/SaslHandshakeRequest.json
b/clients/src/main/resources/common/message/SaslHandshakeRequest.json
index 6cb9024..f384f41 100644
--- a/clients/src/main/resources/common/message/SaslHandshakeRequest.json
+++ b/clients/src/main/resources/common/message/SaslHandshakeRequest.json
@@ -18,9 +18,10 @@
"type": "request",
"name": "SaslHandshakeRequest",
// Version 1 supports SASL_AUTHENTICATE.
- // Version 2 adds flexible version support
- "validVersions": "0-2",
- "flexibleVersions": "2+",
+ // NOTE: Version cannot be easily bumped due to incorrect
+ // client negotiation for clients <= 2.4.
+ // See https://issues.apache.org/jira/browse/KAFKA-9577
+ "validVersions": "0-1",
"fields": [
{ "name": "Mechanism", "type": "string", "versions": "0+",
"about": "The SASL mechanism chosen by the client." }
diff --git
a/clients/src/main/resources/common/message/SaslHandshakeResponse.json
b/clients/src/main/resources/common/message/SaslHandshakeResponse.json
index 0ef8715..039841f 100644
--- a/clients/src/main/resources/common/message/SaslHandshakeResponse.json
+++ b/clients/src/main/resources/common/message/SaslHandshakeResponse.json
@@ -18,9 +18,10 @@
"type": "response",
"name": "SaslHandshakeResponse",
// Version 1 is the same as version 0.
- // Version 2 adds flexible version support
- "validVersions": "0-2",
- "flexibleVersions": "2+",
+ // NOTE: Version cannot be easily bumped due to incorrect
+ // client negotiation for clients <= 2.4.
+ // See https://issues.apache.org/jira/browse/KAFKA-9577
+ "validVersions": "0-1",
"fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 82da0c3..6c7d7c8 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -677,7 +677,7 @@ public class SaslAuthenticatorTest {
* {@link SaslServerAuthenticator} of the server prior to client
authentication.
*/
@Test
- public void testApiVersionsRequestWithUnsupportedVersion() throws
Exception {
+ public void testApiVersionsRequestWithServerUnsupportedVersion() throws
Exception {
short handshakeVersion = ApiKeys.SASL_HANDSHAKE.latestVersion();
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
@@ -715,6 +715,25 @@ public class SaslAuthenticatorTest {
}
/**
+ * Tests correct negotiation of handshake and authenticate api versions by
having the server
+ * return a higher version than supported on the client.
+ * Note, that due to KAFKA-9577 this will require a workaround to
effectively bump
+ * SASL_HANDSHAKE in the future.
+ */
+ @Test
+ public void testSaslUnsupportedClientVersions() throws Exception {
+ configureMechanisms("SCRAM-SHA-512", Arrays.asList("SCRAM-SHA-512"));
+
+ server =
startServerApiVersionsUnsupportedByClient(SecurityProtocol.SASL_SSL,
"SCRAM-SHA-512");
+ updateScramCredentialCache(TestJaasConfig.USERNAME,
TestJaasConfig.PASSWORD);
+
+ String node = "0";
+
+ createClientConnection(SecurityProtocol.SASL_SSL, "SCRAM-SHA-512",
node, true);
+ NetworkTestUtils.checkClientConnection(selector, "0", 100, 10);
+ }
+
+ /**
* Tests that invalid ApiVersionRequest is handled by the server correctly
and
* returns an INVALID_REQUEST error.
*/
@@ -748,6 +767,16 @@ public class SaslAuthenticatorTest {
authenticateUsingSaslPlainAndCheckConnection(node, handshakeVersion >
0);
}
+
+ @Test
+ public void testForBrokenSaslHandshakeVersionBump() {
+ assertEquals("It is not possible to easily bump SASL_HANDSHAKE schema"
+ + " due to improper version negotiation in clients <
2.5."
+ + " Please see
https://issues.apache.org/jira/browse/KAFKA-9577",
+ ApiKeys.SASL_HANDSHAKE.latestVersion(),
+ 1);
+ }
+
/**
* Tests that valid ApiVersionRequest is handled by the server correctly
and
* returns an NONE error.
@@ -1730,6 +1759,47 @@ public class SaslAuthenticatorTest {
createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol,
saslMechanism, node);
}
+ private NioEchoServer startServerApiVersionsUnsupportedByClient(final
SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
+ final ListenerName listenerName =
ListenerName.forSecurityProtocol(securityProtocol);
+ final Map<String, ?> configs = Collections.emptyMap();
+ final JaasContext jaasContext =
JaasContext.loadServerContext(listenerName, saslMechanism, configs);
+ final Map<String, JaasContext> jaasContexts =
Collections.singletonMap(saslMechanism, jaasContext);
+
+ boolean isScram = ScramMechanism.isScram(saslMechanism);
+ if (isScram)
+ ScramCredentialUtils.createCache(credentialCache,
Arrays.asList(saslMechanism));
+ SaslChannelBuilder serverChannelBuilder = new
SaslChannelBuilder(Mode.SERVER, jaasContexts,
+ securityProtocol, listenerName, false, saslMechanism, true,
+ credentialCache, null, time, new LogContext()) {
+
+ @Override
+ protected SaslServerAuthenticator
buildServerAuthenticator(Map<String, ?> configs,
+
Map<String, AuthenticateCallbackHandler> callbackHandlers,
+ String
id,
+
TransportLayer transportLayer,
+
Map<String, Subject> subjects,
+
Map<String, Long> connectionsMaxReauthMsByMechanism,
+
ChannelMetadataRegistry metadataRegistry) {
+ return new SaslServerAuthenticator(configs, callbackHandlers,
id, subjects, null, listenerName,
+ securityProtocol, transportLayer,
connectionsMaxReauthMsByMechanism, metadataRegistry, time) {
+
+ @Override
+ protected ApiVersionsResponse apiVersionsResponse() {
+ ApiVersionsResponseKeyCollection versionCollection =
new ApiVersionsResponseKeyCollection(2);
+ versionCollection.add(new
ApiVersionsResponseKey().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short)
0).setMaxVersion((short) 100));
+ versionCollection.add(new
ApiVersionsResponseKey().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short)
0).setMaxVersion((short) 100));
+ return new ApiVersionsResponse(new
ApiVersionsResponseData().setApiKeys(versionCollection));
+ }
+ };
+ }
+ };
+ serverChannelBuilder.configure(saslServerConfigs);
+ server = new NioEchoServer(listenerName, securityProtocol, new
TestSecurityConfig(saslServerConfigs),
+ "localhost", serverChannelBuilder, credentialCache, time);
+ server.start();
+ return server;
+ }
+
private NioEchoServer startServerWithoutSaslAuthenticateHeader(final
SecurityProtocol securityProtocol, String saslMechanism)
throws Exception {
final ListenerName listenerName =
ListenerName.forSecurityProtocol(securityProtocol);
@@ -1821,7 +1891,7 @@ public class SaslAuthenticatorTest {
return buildSaslHandshakeRequest(saslMechanism,
(short) 0);
}
@Override
- protected void saslAuthenticateVersion(ApiVersionsResponse
apiVersionsResponse) {
+ protected void
setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse
apiVersionsResponse) {
// Don't set version so that headers are disabled
}
};