Repository: kafka
Updated Branches:
  refs/heads/trunk 9ebc303bb -> 47c275349


MINOR: Test SASL authorization id

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3766 from rajinisivaram/MINOR-sasl


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/47c27534
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/47c27534
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/47c27534

Branch: refs/heads/trunk
Commit: 47c2753496875db2849065ad91ee03c7c842c8e9
Parents: 9ebc303
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Thu Aug 31 12:42:08 2017 -0400
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu Aug 31 12:42:08 2017 -0400

----------------------------------------------------------------------
 .../common/security/plain/PlainSaslServer.java  | 14 ++--
 .../common/security/scram/ScramSaslServer.java  | 18 +++--
 .../security/plain/PlainSaslServerTest.java     | 75 ++++++++++++++++++++
 .../security/scram/ScramSaslServerTest.java     | 74 +++++++++++++++++++
 4 files changed, 172 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/47c27534/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
index 6fe3b25..93faee0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java
@@ -50,7 +50,7 @@ public class PlainSaslServer implements SaslServer {
     private final JaasContext jaasContext;
 
     private boolean complete;
-    private String authorizationID;
+    private String authorizationId;
 
     public PlainSaslServer(JaasContext jaasContext) {
         this.jaasContext = jaasContext;
@@ -79,7 +79,7 @@ public class PlainSaslServer implements SaslServer {
         }
         if (tokens.length != 3)
             throw new SaslException("Invalid SASL/PLAIN response: expected 3 
tokens, got " + tokens.length);
-        authorizationID = tokens[0];
+        String authorizationIdFromClient = tokens[0];
         String username = tokens[1];
         String password = tokens[2];
 
@@ -89,14 +89,18 @@ public class PlainSaslServer implements SaslServer {
         if (password.isEmpty()) {
             throw new SaslException("Authentication failed: password not 
specified");
         }
-        if (authorizationID.isEmpty())
-            authorizationID = username;
 
         String expectedPassword = 
jaasContext.configEntryOption(JAAS_USER_PREFIX + username,
                 PlainLoginModule.class.getName());
         if (!password.equals(expectedPassword)) {
             throw new SaslException("Authentication failed: Invalid username 
or password");
         }
+
+        if (!authorizationIdFromClient.isEmpty() && 
!authorizationIdFromClient.equals(username))
+            throw new SaslException("Authentication failed: Client requested 
an authorization id that is different from username");
+
+        this.authorizationId = username;
+
         complete = true;
         return new byte[0];
     }
@@ -105,7 +109,7 @@ public class PlainSaslServer implements SaslServer {
     public String getAuthorizationID() {
         if (!complete)
             throw new IllegalStateException("Authentication exchange has not 
completed");
-        return authorizationID;
+        return authorizationId;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/47c27534/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
index 942c299..04b9afa 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServer.java
@@ -64,7 +64,6 @@ public class ScramSaslServer implements SaslServer {
     private String username;
     private ClientFirstMessage clientFirstMessage;
     private ServerFirstMessage serverFirstMessage;
-    private String serverNonce;
     private ScramCredential scramCredential;
 
     public ScramSaslServer(ScramMechanism mechanism, Map<String, ?> props, 
CallbackHandler callbackHandler) throws NoSuchAlgorithmException {
@@ -80,7 +79,7 @@ public class ScramSaslServer implements SaslServer {
             switch (state) {
                 case RECEIVE_CLIENT_FIRST_MESSAGE:
                     this.clientFirstMessage = new ClientFirstMessage(response);
-                    serverNonce = formatter.secureRandomString();
+                    String serverNonce = formatter.secureRandomString();
                     try {
                         String saslName = clientFirstMessage.saslName();
                         this.username = formatter.username(saslName);
@@ -90,6 +89,10 @@ public class ScramSaslServer implements SaslServer {
                         this.scramCredential = 
credentialCallback.scramCredential();
                         if (scramCredential == null)
                             throw new SaslException("Authentication failed: 
Invalid user credentials");
+                        String authorizationIdFromClient = 
clientFirstMessage.authorizationId();
+                        if (!authorizationIdFromClient.isEmpty() && 
!authorizationIdFromClient.equals(username))
+                            throw new SaslException("Authentication failed: 
Client requested an authorization id that is different from username");
+
                         if (scramCredential.iterations() < 
mechanism.minIterations())
                             throw new SaslException("Iterations " + 
scramCredential.iterations() +  " is less than the minimum " + 
mechanism.minIterations() + " for " + mechanism);
                         this.serverFirstMessage = new 
ServerFirstMessage(clientFirstMessage.nonce(),
@@ -109,6 +112,7 @@ public class ScramSaslServer implements SaslServer {
                         byte[] serverKey = scramCredential.serverKey();
                         byte[] serverSignature = 
formatter.serverSignature(serverKey, clientFirstMessage, serverFirstMessage, 
clientFinalMessage);
                         ServerFinalMessage serverFinalMessage = new 
ServerFinalMessage(null, serverSignature);
+                        clearCredentials();
                         setState(State.COMPLETE);
                         return serverFinalMessage.toBytes();
                     } catch (InvalidKeyException e) {
@@ -119,6 +123,7 @@ public class ScramSaslServer implements SaslServer {
                     throw new IllegalSaslStateException("Unexpected challenge 
in Sasl server state " + state);
             }
         } catch (SaslException e) {
+            clearCredentials();
             setState(State.FAILED);
             throw e;
         }
@@ -128,8 +133,7 @@ public class ScramSaslServer implements SaslServer {
     public String getAuthorizationID() {
         if (!isComplete())
             throw new IllegalStateException("Authentication exchange has not 
completed");
-        String authzId = clientFirstMessage.authorizationId();
-        return authzId == null || authzId.length() == 0 ? username : authzId;
+        return username;
     }
 
     @Override
@@ -184,6 +188,12 @@ public class ScramSaslServer implements SaslServer {
         }
     }
 
+    private void clearCredentials() {
+        scramCredential = null;
+        clientFirstMessage = null;
+        serverFirstMessage = null;
+    }
+
     public static class ScramSaslServerFactory implements SaslServerFactory {
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/47c27534/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
new file mode 100644
index 0000000..5ca778b
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/plain/PlainSaslServerTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.plain;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.sasl.SaslException;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.authenticator.TestJaasConfig;
+
+public class PlainSaslServerTest {
+
+    private static final String USER_A = "userA";
+    private static final String PASSWORD_A = "passwordA";
+    private static final String USER_B = "userB";
+    private static final String PASSWORD_B = "passwordB";
+
+    private PlainSaslServer saslServer;
+
+    @Before
+    public void setUp() throws Exception {
+        TestJaasConfig jaasConfig = new TestJaasConfig();
+        Map<String, Object> options = new HashMap<>();
+        options.put("user_" + USER_A, PASSWORD_A);
+        options.put("user_" + USER_B, PASSWORD_B);
+        jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), 
options);
+        JaasContext jaasContext = new JaasContext("jaasContext", 
JaasContext.Type.SERVER, jaasConfig);
+        saslServer = new PlainSaslServer(jaasContext);
+    }
+
+    @Test
+    public void noAuthorizationIdSpecified() throws Exception {
+        byte[] nextChallenge = saslServer.evaluateResponse(saslMessage("", 
USER_A, PASSWORD_A));
+        assertEquals(0, nextChallenge.length);
+    }
+
+    @Test
+    public void authorizatonIdEqualsAuthenticationId() throws Exception {
+        byte[] nextChallenge = saslServer.evaluateResponse(saslMessage(USER_A, 
USER_A, PASSWORD_A));
+        assertEquals(0, nextChallenge.length);
+    }
+
+    @Test(expected = SaslException.class)
+    public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
+        saslServer.evaluateResponse(saslMessage(USER_B, USER_A, PASSWORD_A));
+    }
+
+    private byte[] saslMessage(String authorizationId, String userName, String 
password) {
+        String nul = "\u0000";
+        String message = String.format("%s%s%s%s%s", authorizationId, nul, 
userName, nul, password);
+        return message.getBytes(StandardCharsets.UTF_8);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47c27534/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
new file mode 100644
index 0000000..9b329d0
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramSaslServerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.scram;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+import javax.security.sasl.SaslException;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+
+public class ScramSaslServerTest {
+
+    private static final String USER_A = "userA";
+    private static final String USER_B = "userB";
+
+    private ScramMechanism mechanism;
+    private ScramFormatter formatter;
+    private ScramSaslServer saslServer;
+
+    @Before
+    public void setUp() throws Exception {
+        mechanism = ScramMechanism.SCRAM_SHA_256;
+        formatter  = new ScramFormatter(mechanism);
+        CredentialCache.Cache<ScramCredential> credentialCache = new 
CredentialCache().createCache(mechanism.mechanismName(), ScramCredential.class);
+        credentialCache.put(USER_A, formatter.generateCredential("passwordA", 
4096));
+        credentialCache.put(USER_B, formatter.generateCredential("passwordB", 
4096));
+        ScramServerCallbackHandler callbackHandler = new 
ScramServerCallbackHandler(credentialCache);
+        saslServer = new ScramSaslServer(mechanism, new HashMap<String, 
Object>(), callbackHandler);
+    }
+
+    @Test
+    public void noAuthorizationIdSpecified() throws Exception {
+        byte[] nextChallenge = 
saslServer.evaluateResponse(clientFirstMessage(USER_A, null));
+        assertTrue("Next challenge is empty", nextChallenge.length > 0);
+    }
+
+    @Test
+    public void authorizatonIdEqualsAuthenticationId() throws Exception {
+        byte[] nextChallenge = 
saslServer.evaluateResponse(clientFirstMessage(USER_A, USER_A));
+        assertTrue("Next challenge is empty", nextChallenge.length > 0);
+    }
+
+    @Test(expected = SaslException.class)
+    public void authorizatonIdNotEqualsAuthenticationId() throws Exception {
+        saslServer.evaluateResponse(clientFirstMessage(USER_A, USER_B));
+    }
+
+    private byte[] clientFirstMessage(String userName, String authorizationId) 
{
+        String nonce = formatter.secureRandomString();
+        String authorizationField = authorizationId != null ? "a=" + 
authorizationId : "";
+        String firstMessage = String.format("n,%s,n=%s,r=%s", 
authorizationField, userName, nonce);
+        return firstMessage.getBytes(StandardCharsets.UTF_8);
+    }
+}

Reply via email to