This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new c6cbdf2 KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (#8142) c6cbdf2 is described below commit c6cbdf2be85088d043e40c794000dcdbdac008d9 Author: Lucas Bradstreet <lu...@confluent.io> AuthorDate: Fri Feb 21 21:49:11 2020 -0800 KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (#8142) The SaslClientAuthenticator incorrectly negotiates supported SaslHandshakeRequest version and uses the maximum version supported by the broker whether or not the client supports it. This bug was exposed by a recent version bump in https://github.com/apache/kafka/commit/0a2569e2b9907a1217dd50ccbc320f8ad0b42fd0. This PR rolls back the recent SaslHandshake[Request,Response] bump, fixes the version negotiation, and adds a test to prevent anyone from accidentally bumping the version without a workaround such as a new ApiKey. The existing key will be difficult to support for clients < 2.5 due to the incorrect negotiation. Reviewers: Ron Dagostino <rdagost...@confluent.io>, Rajini Sivaram <rajinisiva...@googlemail.com>, Colin P. McCabe <cmcc...@apache.org>, Jason Gustafson <ja...@confluent.io> --- .../authenticator/SaslClientAuthenticator.java | 25 +++++--- .../common/message/SaslHandshakeRequest.json | 7 +- .../common/message/SaslHandshakeResponse.json | 7 +- .../authenticator/SaslAuthenticatorTest.java | 74 +++++++++++++++++++++- 4 files changed, 96 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java index 6784a01..e972da1 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java @@ -133,6 +133,8 @@ public class SaslClientAuthenticator implements Authenticator { private RequestHeader currentRequestHeader; // Version of SaslAuthenticate request/responses private short saslAuthenticateVersion; + // Version of SaslHandshake request/responses + private short saslHandshakeVersion; public SaslClientAuthenticator(Map<String, ?> configs, AuthenticateCallbackHandler callbackHandler, @@ -213,13 +215,13 @@ public class SaslClientAuthenticator implements Authenticator { if (apiVersionsResponse == null) break; else { - saslAuthenticateVersion(apiVersionsResponse); + setSaslAuthenticateAndHandshakeVersions(apiVersionsResponse); reauthInfo.apiVersionsResponseReceivedFromBroker = apiVersionsResponse; setSaslState(SaslState.SEND_HANDSHAKE_REQUEST); // Fall through to send handshake request with the latest supported version } case SEND_HANDSHAKE_REQUEST: - sendHandshakeRequest(reauthInfo.apiVersionsResponseReceivedFromBroker); + sendHandshakeRequest(saslHandshakeVersion); setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE); break; case RECEIVE_HANDSHAKE_RESPONSE: @@ -236,11 +238,11 @@ public class SaslClientAuthenticator implements Authenticator { setSaslState(SaslState.INTERMEDIATE); break; case REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE: - saslAuthenticateVersion(reauthInfo.apiVersionsResponseFromOriginalAuthentication); + setSaslAuthenticateAndHandshakeVersions(reauthInfo.apiVersionsResponseFromOriginalAuthentication); setSaslState(SaslState.REAUTH_SEND_HANDSHAKE_REQUEST); // Will set immediately // Fall through to send handshake request with the latest supported version case REAUTH_SEND_HANDSHAKE_REQUEST: - sendHandshakeRequest(reauthInfo.apiVersionsResponseFromOriginalAuthentication); + sendHandshakeRequest(saslHandshakeVersion); setSaslState(SaslState.REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE); break; case REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE: @@ -285,9 +287,8 @@ public class SaslClientAuthenticator implements Authenticator { } } - private void sendHandshakeRequest(ApiVersionsResponse apiVersionsResponse) throws IOException { - SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest( - apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion()); + private void sendHandshakeRequest(short version) throws IOException { + SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(version); send(handshakeRequest.toSend(node, nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version()))); } @@ -345,11 +346,17 @@ public class SaslClientAuthenticator implements Authenticator { } // Visible to override for testing - protected void saslAuthenticateVersion(ApiVersionsResponse apiVersionsResponse) { + protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse apiVersionsResponse) { ApiVersionsResponseKey authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id); - if (authenticateVersion != null) + if (authenticateVersion != null) { this.saslAuthenticateVersion = (short) Math.min(authenticateVersion.maxVersion(), ApiKeys.SASL_AUTHENTICATE.latestVersion()); + } + ApiVersionsResponseKey handshakeVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id); + if (handshakeVersion != null) { + this.saslHandshakeVersion = (short) Math.min(handshakeVersion.maxVersion(), + ApiKeys.SASL_HANDSHAKE.latestVersion()); + } } private void setSaslState(SaslState saslState) { diff --git a/clients/src/main/resources/common/message/SaslHandshakeRequest.json b/clients/src/main/resources/common/message/SaslHandshakeRequest.json index 6cb9024..f384f41 100644 --- a/clients/src/main/resources/common/message/SaslHandshakeRequest.json +++ b/clients/src/main/resources/common/message/SaslHandshakeRequest.json @@ -18,9 +18,10 @@ "type": "request", "name": "SaslHandshakeRequest", // Version 1 supports SASL_AUTHENTICATE. - // Version 2 adds flexible version support - "validVersions": "0-2", - "flexibleVersions": "2+", + // NOTE: Version cannot be easily bumped due to incorrect + // client negotiation for clients <= 2.4. + // See https://issues.apache.org/jira/browse/KAFKA-9577 + "validVersions": "0-1", "fields": [ { "name": "Mechanism", "type": "string", "versions": "0+", "about": "The SASL mechanism chosen by the client." } diff --git a/clients/src/main/resources/common/message/SaslHandshakeResponse.json b/clients/src/main/resources/common/message/SaslHandshakeResponse.json index 0ef8715..039841f 100644 --- a/clients/src/main/resources/common/message/SaslHandshakeResponse.json +++ b/clients/src/main/resources/common/message/SaslHandshakeResponse.json @@ -18,9 +18,10 @@ "type": "response", "name": "SaslHandshakeResponse", // Version 1 is the same as version 0. - // Version 2 adds flexible version support - "validVersions": "0-2", - "flexibleVersions": "2+", + // NOTE: Version cannot be easily bumped due to incorrect + // client negotiation for clients <= 2.4. + // See https://issues.apache.org/jira/browse/KAFKA-9577 + "validVersions": "0-1", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no error." }, diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 82da0c3..6c7d7c8 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -677,7 +677,7 @@ public class SaslAuthenticatorTest { * {@link SaslServerAuthenticator} of the server prior to client authentication. */ @Test - public void testApiVersionsRequestWithUnsupportedVersion() throws Exception { + public void testApiVersionsRequestWithServerUnsupportedVersion() throws Exception { short handshakeVersion = ApiKeys.SASL_HANDSHAKE.latestVersion(); SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; configureMechanisms("PLAIN", Arrays.asList("PLAIN")); @@ -715,6 +715,25 @@ public class SaslAuthenticatorTest { } /** + * Tests correct negotiation of handshake and authenticate api versions by having the server + * return a higher version than supported on the client. + * Note, that due to KAFKA-9577 this will require a workaround to effectively bump + * SASL_HANDSHAKE in the future. + */ + @Test + public void testSaslUnsupportedClientVersions() throws Exception { + configureMechanisms("SCRAM-SHA-512", Arrays.asList("SCRAM-SHA-512")); + + server = startServerApiVersionsUnsupportedByClient(SecurityProtocol.SASL_SSL, "SCRAM-SHA-512"); + updateScramCredentialCache(TestJaasConfig.USERNAME, TestJaasConfig.PASSWORD); + + String node = "0"; + + createClientConnection(SecurityProtocol.SASL_SSL, "SCRAM-SHA-512", node, true); + NetworkTestUtils.checkClientConnection(selector, "0", 100, 10); + } + + /** * Tests that invalid ApiVersionRequest is handled by the server correctly and * returns an INVALID_REQUEST error. */ @@ -748,6 +767,16 @@ public class SaslAuthenticatorTest { authenticateUsingSaslPlainAndCheckConnection(node, handshakeVersion > 0); } + + @Test + public void testForBrokenSaslHandshakeVersionBump() { + assertEquals("It is not possible to easily bump SASL_HANDSHAKE schema" + + " due to improper version negotiation in clients < 2.5." + + " Please see https://issues.apache.org/jira/browse/KAFKA-9577", + ApiKeys.SASL_HANDSHAKE.latestVersion(), + 1); + } + /** * Tests that valid ApiVersionRequest is handled by the server correctly and * returns an NONE error. @@ -1730,6 +1759,47 @@ public class SaslAuthenticatorTest { createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism, node); } + private NioEchoServer startServerApiVersionsUnsupportedByClient(final SecurityProtocol securityProtocol, String saslMechanism) throws Exception { + final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); + final Map<String, ?> configs = Collections.emptyMap(); + final JaasContext jaasContext = JaasContext.loadServerContext(listenerName, saslMechanism, configs); + final Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext); + + boolean isScram = ScramMechanism.isScram(saslMechanism); + if (isScram) + ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism)); + SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts, + securityProtocol, listenerName, false, saslMechanism, true, + credentialCache, null, time, new LogContext()) { + + @Override + protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, + Map<String, AuthenticateCallbackHandler> callbackHandlers, + String id, + TransportLayer transportLayer, + Map<String, Subject> subjects, + Map<String, Long> connectionsMaxReauthMsByMechanism, + ChannelMetadataRegistry metadataRegistry) { + return new SaslServerAuthenticator(configs, callbackHandlers, id, subjects, null, listenerName, + securityProtocol, transportLayer, connectionsMaxReauthMsByMechanism, metadataRegistry, time) { + + @Override + protected ApiVersionsResponse apiVersionsResponse() { + ApiVersionsResponseKeyCollection versionCollection = new ApiVersionsResponseKeyCollection(2); + versionCollection.add(new ApiVersionsResponseKey().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short) 0).setMaxVersion((short) 100)); + versionCollection.add(new ApiVersionsResponseKey().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short) 0).setMaxVersion((short) 100)); + return new ApiVersionsResponse(new ApiVersionsResponseData().setApiKeys(versionCollection)); + } + }; + } + }; + serverChannelBuilder.configure(saslServerConfigs); + server = new NioEchoServer(listenerName, securityProtocol, new TestSecurityConfig(saslServerConfigs), + "localhost", serverChannelBuilder, credentialCache, time); + server.start(); + return server; + } + private NioEchoServer startServerWithoutSaslAuthenticateHeader(final SecurityProtocol securityProtocol, String saslMechanism) throws Exception { final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol); @@ -1821,7 +1891,7 @@ public class SaslAuthenticatorTest { return buildSaslHandshakeRequest(saslMechanism, (short) 0); } @Override - protected void saslAuthenticateVersion(ApiVersionsResponse apiVersionsResponse) { + protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse apiVersionsResponse) { // Don't set version so that headers are disabled } };