This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new c6cbdf2  KAFKA-9577; SaslClientAuthenticator incorrectly negotiates 
SASL_HANDSHAKE version (#8142)
c6cbdf2 is described below

commit c6cbdf2be85088d043e40c794000dcdbdac008d9
Author: Lucas Bradstreet <lu...@confluent.io>
AuthorDate: Fri Feb 21 21:49:11 2020 -0800

    KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE 
version (#8142)
    
    The SaslClientAuthenticator incorrectly negotiates supported 
SaslHandshakeRequest version and  uses the maximum version supported by the 
broker whether or not the client supports it.
    
    This bug was exposed by a recent version bump in 
https://github.com/apache/kafka/commit/0a2569e2b9907a1217dd50ccbc320f8ad0b42fd0.
    
    This PR rolls back the recent SaslHandshake[Request,Response] bump, fixes 
the version negotiation, and adds a test to prevent anyone from accidentally 
bumping the version without a workaround such as a new ApiKey. The existing key 
will be difficult to support for clients < 2.5 due to the incorrect negotiation.
    
    Reviewers: Ron Dagostino <rdagost...@confluent.io>, Rajini Sivaram 
<rajinisiva...@googlemail.com>, Colin P. McCabe <cmcc...@apache.org>, Jason 
Gustafson <ja...@confluent.io>
---
 .../authenticator/SaslClientAuthenticator.java     | 25 +++++---
 .../common/message/SaslHandshakeRequest.json       |  7 +-
 .../common/message/SaslHandshakeResponse.json      |  7 +-
 .../authenticator/SaslAuthenticatorTest.java       | 74 +++++++++++++++++++++-
 4 files changed, 96 insertions(+), 17 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 6784a01..e972da1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -133,6 +133,8 @@ public class SaslClientAuthenticator implements 
Authenticator {
     private RequestHeader currentRequestHeader;
     // Version of SaslAuthenticate request/responses
     private short saslAuthenticateVersion;
+    // Version of SaslHandshake request/responses
+    private short saslHandshakeVersion;
 
     public SaslClientAuthenticator(Map<String, ?> configs,
                                    AuthenticateCallbackHandler callbackHandler,
@@ -213,13 +215,13 @@ public class SaslClientAuthenticator implements 
Authenticator {
                 if (apiVersionsResponse == null)
                     break;
                 else {
-                    saslAuthenticateVersion(apiVersionsResponse);
+                    
setSaslAuthenticateAndHandshakeVersions(apiVersionsResponse);
                     reauthInfo.apiVersionsResponseReceivedFromBroker = 
apiVersionsResponse;
                     setSaslState(SaslState.SEND_HANDSHAKE_REQUEST);
                     // Fall through to send handshake request with the latest 
supported version
                 }
             case SEND_HANDSHAKE_REQUEST:
-                
sendHandshakeRequest(reauthInfo.apiVersionsResponseReceivedFromBroker);
+                sendHandshakeRequest(saslHandshakeVersion);
                 setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
                 break;
             case RECEIVE_HANDSHAKE_RESPONSE:
@@ -236,11 +238,11 @@ public class SaslClientAuthenticator implements 
Authenticator {
                 setSaslState(SaslState.INTERMEDIATE);
                 break;
             case REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE:
-                
saslAuthenticateVersion(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
+                
setSaslAuthenticateAndHandshakeVersions(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
                 setSaslState(SaslState.REAUTH_SEND_HANDSHAKE_REQUEST); // Will 
set immediately
                 // Fall through to send handshake request with the latest 
supported version
             case REAUTH_SEND_HANDSHAKE_REQUEST:
-                
sendHandshakeRequest(reauthInfo.apiVersionsResponseFromOriginalAuthentication);
+                sendHandshakeRequest(saslHandshakeVersion);
                 
setSaslState(SaslState.REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE);
                 break;
             case REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE:
@@ -285,9 +287,8 @@ public class SaslClientAuthenticator implements 
Authenticator {
         }
     }
 
-    private void sendHandshakeRequest(ApiVersionsResponse apiVersionsResponse) 
throws IOException {
-        SaslHandshakeRequest handshakeRequest = createSaslHandshakeRequest(
-                
apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion());
+    private void sendHandshakeRequest(short version) throws IOException {
+        SaslHandshakeRequest handshakeRequest = 
createSaslHandshakeRequest(version);
         send(handshakeRequest.toSend(node, 
nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version())));
     }
 
@@ -345,11 +346,17 @@ public class SaslClientAuthenticator implements 
Authenticator {
     }
 
     // Visible to override for testing
-    protected void saslAuthenticateVersion(ApiVersionsResponse 
apiVersionsResponse) {
+    protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse 
apiVersionsResponse) {
         ApiVersionsResponseKey authenticateVersion = 
apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id);
-        if (authenticateVersion != null)
+        if (authenticateVersion != null) {
             this.saslAuthenticateVersion = (short) 
Math.min(authenticateVersion.maxVersion(),
                     ApiKeys.SASL_AUTHENTICATE.latestVersion());
+        }
+        ApiVersionsResponseKey handshakeVersion = 
apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id);
+        if (handshakeVersion != null) {
+            this.saslHandshakeVersion = (short) 
Math.min(handshakeVersion.maxVersion(),
+                    ApiKeys.SASL_HANDSHAKE.latestVersion());
+        }
     }
 
     private void setSaslState(SaslState saslState) {
diff --git 
a/clients/src/main/resources/common/message/SaslHandshakeRequest.json 
b/clients/src/main/resources/common/message/SaslHandshakeRequest.json
index 6cb9024..f384f41 100644
--- a/clients/src/main/resources/common/message/SaslHandshakeRequest.json
+++ b/clients/src/main/resources/common/message/SaslHandshakeRequest.json
@@ -18,9 +18,10 @@
   "type": "request",
   "name": "SaslHandshakeRequest",
   // Version 1 supports SASL_AUTHENTICATE.
-  // Version 2 adds flexible version support
-  "validVersions": "0-2",
-  "flexibleVersions": "2+",
+  // NOTE: Version cannot be easily bumped due to incorrect
+  // client negotiation for clients <= 2.4.
+  // See https://issues.apache.org/jira/browse/KAFKA-9577
+  "validVersions": "0-1",
   "fields": [
     { "name": "Mechanism", "type": "string", "versions": "0+",
       "about": "The SASL mechanism chosen by the client." }
diff --git 
a/clients/src/main/resources/common/message/SaslHandshakeResponse.json 
b/clients/src/main/resources/common/message/SaslHandshakeResponse.json
index 0ef8715..039841f 100644
--- a/clients/src/main/resources/common/message/SaslHandshakeResponse.json
+++ b/clients/src/main/resources/common/message/SaslHandshakeResponse.json
@@ -18,9 +18,10 @@
   "type": "response",
   "name": "SaslHandshakeResponse",
   // Version 1 is the same as version 0.
-  // Version 2 adds flexible version support
-  "validVersions": "0-2",
-  "flexibleVersions": "2+",
+  // NOTE: Version cannot be easily bumped due to incorrect
+  // client negotiation for clients <= 2.4.
+  // See https://issues.apache.org/jira/browse/KAFKA-9577
+  "validVersions": "0-1",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 82da0c3..6c7d7c8 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -677,7 +677,7 @@ public class SaslAuthenticatorTest {
      * {@link SaslServerAuthenticator} of the server prior to client 
authentication.
      */
     @Test
-    public void testApiVersionsRequestWithUnsupportedVersion() throws 
Exception {
+    public void testApiVersionsRequestWithServerUnsupportedVersion() throws 
Exception {
         short handshakeVersion = ApiKeys.SASL_HANDSHAKE.latestVersion();
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
@@ -715,6 +715,25 @@ public class SaslAuthenticatorTest {
     }
 
     /**
+     * Tests correct negotiation of handshake and authenticate api versions by 
having the server
+     * return a higher version than supported on the client.
+     * Note, that due to KAFKA-9577 this will require a workaround to 
effectively bump
+     * SASL_HANDSHAKE in the future.
+     */
+    @Test
+    public void testSaslUnsupportedClientVersions() throws Exception {
+        configureMechanisms("SCRAM-SHA-512", Arrays.asList("SCRAM-SHA-512"));
+
+        server = 
startServerApiVersionsUnsupportedByClient(SecurityProtocol.SASL_SSL, 
"SCRAM-SHA-512");
+        updateScramCredentialCache(TestJaasConfig.USERNAME, 
TestJaasConfig.PASSWORD);
+
+        String node = "0";
+
+        createClientConnection(SecurityProtocol.SASL_SSL, "SCRAM-SHA-512", 
node, true);
+        NetworkTestUtils.checkClientConnection(selector, "0", 100, 10);
+    }
+
+    /**
      * Tests that invalid ApiVersionRequest is handled by the server correctly 
and
      * returns an INVALID_REQUEST error.
      */
@@ -748,6 +767,16 @@ public class SaslAuthenticatorTest {
         authenticateUsingSaslPlainAndCheckConnection(node, handshakeVersion > 
0);
     }
 
+
+    @Test
+    public void testForBrokenSaslHandshakeVersionBump() {
+        assertEquals("It is not possible to easily bump SASL_HANDSHAKE schema"
+                        + " due to improper version negotiation in clients < 
2.5."
+                        + " Please see 
https://issues.apache.org/jira/browse/KAFKA-9577";,
+                ApiKeys.SASL_HANDSHAKE.latestVersion(),
+                1);
+    }
+
     /**
      * Tests that valid ApiVersionRequest is handled by the server correctly 
and
      * returns an NONE error.
@@ -1730,6 +1759,47 @@ public class SaslAuthenticatorTest {
             
createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol, 
saslMechanism, node);
     }
 
+    private NioEchoServer startServerApiVersionsUnsupportedByClient(final 
SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
+        final ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
+        final Map<String, ?> configs = Collections.emptyMap();
+        final JaasContext jaasContext = 
JaasContext.loadServerContext(listenerName, saslMechanism, configs);
+        final Map<String, JaasContext> jaasContexts = 
Collections.singletonMap(saslMechanism, jaasContext);
+
+        boolean isScram = ScramMechanism.isScram(saslMechanism);
+        if (isScram)
+            ScramCredentialUtils.createCache(credentialCache, 
Arrays.asList(saslMechanism));
+        SaslChannelBuilder serverChannelBuilder = new 
SaslChannelBuilder(Mode.SERVER, jaasContexts,
+                securityProtocol, listenerName, false, saslMechanism, true,
+                credentialCache, null, time, new LogContext()) {
+
+            @Override
+            protected SaslServerAuthenticator 
buildServerAuthenticator(Map<String, ?> configs,
+                                                                       
Map<String, AuthenticateCallbackHandler> callbackHandlers,
+                                                                       String 
id,
+                                                                       
TransportLayer transportLayer,
+                                                                       
Map<String, Subject> subjects,
+                                                                       
Map<String, Long> connectionsMaxReauthMsByMechanism,
+                                                                       
ChannelMetadataRegistry metadataRegistry) {
+                return new SaslServerAuthenticator(configs, callbackHandlers, 
id, subjects, null, listenerName,
+                        securityProtocol, transportLayer, 
connectionsMaxReauthMsByMechanism, metadataRegistry, time) {
+
+                    @Override
+                    protected ApiVersionsResponse apiVersionsResponse() {
+                        ApiVersionsResponseKeyCollection versionCollection = 
new ApiVersionsResponseKeyCollection(2);
+                        versionCollection.add(new 
ApiVersionsResponseKey().setApiKey(ApiKeys.SASL_HANDSHAKE.id).setMinVersion((short)
 0).setMaxVersion((short) 100));
+                        versionCollection.add(new 
ApiVersionsResponseKey().setApiKey(ApiKeys.SASL_AUTHENTICATE.id).setMinVersion((short)
 0).setMaxVersion((short) 100));
+                        return new ApiVersionsResponse(new 
ApiVersionsResponseData().setApiKeys(versionCollection));
+                    }
+                };
+            }
+        };
+        serverChannelBuilder.configure(saslServerConfigs);
+        server = new NioEchoServer(listenerName, securityProtocol, new 
TestSecurityConfig(saslServerConfigs),
+                "localhost", serverChannelBuilder, credentialCache, time);
+        server.start();
+        return server;
+    }
+
     private NioEchoServer startServerWithoutSaslAuthenticateHeader(final 
SecurityProtocol securityProtocol, String saslMechanism)
             throws Exception {
         final ListenerName listenerName = 
ListenerName.forSecurityProtocol(securityProtocol);
@@ -1821,7 +1891,7 @@ public class SaslAuthenticatorTest {
                         return buildSaslHandshakeRequest(saslMechanism, 
(short) 0);
                     }
                     @Override
-                    protected void saslAuthenticateVersion(ApiVersionsResponse 
apiVersionsResponse) {
+                    protected void 
setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse 
apiVersionsResponse) {
                         // Don't set version so that headers are disabled
                     }
                 };

Reply via email to