Repository: kafka
Updated Branches:
  refs/heads/trunk f7b1add68 -> 3b5d88feb


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
index 52a34fc..72e06a2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
@@ -21,7 +21,6 @@ import java.util.Map;
 
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.AuthCallbackHandler;
-import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +30,6 @@ import 
javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
 
-import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.network.Mode;
 
 /**
@@ -42,12 +40,10 @@ import org.apache.kafka.common.network.Mode;
  */
 public class SaslServerCallbackHandler implements AuthCallbackHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(SaslServerCallbackHandler.class);
-    private final KerberosShortNamer kerberosShortNamer;
     private final JaasContext jaasContext;
 
-    public SaslServerCallbackHandler(JaasContext jaasContext, 
KerberosShortNamer kerberosNameParser) throws IOException {
+    public SaslServerCallbackHandler(JaasContext jaasContext) throws 
IOException {
         this.jaasContext = jaasContext;
-        this.kerberosShortNamer = kerberosNameParser;
     }
 
     @Override
@@ -77,19 +73,10 @@ public class SaslServerCallbackHandler implements 
AuthCallbackHandler {
     private void handleAuthorizeCallback(AuthorizeCallback ac) {
         String authenticationID = ac.getAuthenticationID();
         String authorizationID = ac.getAuthorizationID();
-
-        LOG.info("Successfully authenticated client: authenticationID={}; 
authorizationID={}.", authenticationID,
-                authorizationID);
+        LOG.info("Successfully authenticated client: authenticationID={}; 
authorizationID={}.",
+                authenticationID, authorizationID);
         ac.setAuthorized(true);
-
-        KerberosName kerberosName = KerberosName.parse(authenticationID);
-        try {
-            String userName = kerberosShortNamer.shortName(kerberosName);
-            LOG.info("Setting authorizedID: {}", userName);
-            ac.setAuthorizedID(userName);
-        } catch (IOException e) {
-            LOG.error("Failed to set name for '{}' based on Kerberos 
authentication rules.", kerberosName, e);
-        }
+        ac.setAuthorizedID(authenticationID);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index c7905bb..6b2fe74 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.config.types.Password;
 
@@ -97,7 +98,7 @@ public class SslFactory implements Configurable {
 
         String clientAuthConfig = clientAuthConfigOverride;
         if (clientAuthConfig == null)
-            clientAuthConfig = (String) 
configs.get(SslConfigs.SSL_CLIENT_AUTH_CONFIG);
+            clientAuthConfig = (String) 
configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG);
         if (clientAuthConfig != null) {
             if (clientAuthConfig.equals("required"))
                 this.needClientAuth = true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
new file mode 100644
index 0000000..9c9bd44
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+public class SecurityUtils {
+
+    public static KafkaPrincipal parseKafkaPrincipal(String str) {
+        if (str == null || str.isEmpty()) {
+            throw new IllegalArgumentException("expected a string in format 
principalType:principalName but got " + str);
+        }
+
+        String[] split = str.split(":", 2);
+
+        if (split.length != 2) {
+            throw new IllegalArgumentException("expected a string in format 
principalType:principalName but got " + str);
+        }
+
+        return new KafkaPrincipal(split[0], split[1]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 2025107..1137045 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.utils;
 
-import java.text.DecimalFormat;
 import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +44,7 @@ import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -719,7 +719,7 @@ public class Utils {
             try {
                 closeable.close();
             } catch (Throwable t) {
-                log.warn("Failed to close {}", name, t);
+                log.warn("Failed to close {} with type {}", name, 
closeable.getClass().getName(), t);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java 
b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
index b6cc1d4..916e619 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java
@@ -16,14 +16,14 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.test.TestSslUtils;
+
 import java.io.File;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.test.TestSslUtils;
-
 public class CertStores {
 
     private final Map<String, Object> sslConfig;
@@ -45,8 +45,6 @@ public class CertStores {
         Mode mode = server ? Mode.SERVER : Mode.CLIENT;
         File truststoreFile = File.createTempFile(name + "TS", ".jks");
         sslConfig = TestSslUtils.createSslConfig(!server, true, mode, 
truststoreFile, name, commonName, certBuilder);
-        if (server)
-            sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
     }
 
     public Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
new file mode 100644
index 0000000..de210e7
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.security.auth.AuthenticationContext;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext;
+import org.apache.kafka.common.security.auth.PrincipalBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.security.Principal;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ChannelBuildersTest {
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testCreateOldPrincipalBuilder() throws Exception {
+        TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
+        Authenticator authenticator = EasyMock.mock(Authenticator.class);
+
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
OldPrincipalBuilder.class);
+        KafkaPrincipalBuilder builder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, 
null);
+
+        // test old principal builder is properly configured and delegated to
+        assertTrue(OldPrincipalBuilder.configured);
+
+        // test delegation
+        KafkaPrincipal principal = builder.build(new 
PlaintextAuthenticationContext(InetAddress.getLocalHost()));
+        assertEquals(OldPrincipalBuilder.PRINCIPAL_NAME, principal.getName());
+        assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+    }
+
+    @Test
+    public void testCreateConfigurableKafkaPrincipalBuilder() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
ConfigurableKafkaPrincipalBuilder.class);
+        KafkaPrincipalBuilder builder = 
ChannelBuilders.createPrincipalBuilder(configs, null, null, null);
+        assertTrue(builder instanceof ConfigurableKafkaPrincipalBuilder);
+        assertTrue(((ConfigurableKafkaPrincipalBuilder) builder).configured);
+    }
+
+    @SuppressWarnings("deprecation")
+    public static class OldPrincipalBuilder implements PrincipalBuilder {
+        private static boolean configured = false;
+        private static final String PRINCIPAL_NAME = "bob";
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            configured = true;
+        }
+
+        @Override
+        public Principal buildPrincipal(TransportLayer transportLayer, 
Authenticator authenticator) throws KafkaException {
+            return new Principal() {
+                @Override
+                public String getName() {
+                    return PRINCIPAL_NAME;
+                }
+            };
+        }
+
+        @Override
+        public void close() throws KafkaException {
+
+        }
+    }
+
+    public static class ConfigurableKafkaPrincipalBuilder implements 
KafkaPrincipalBuilder, Configurable {
+        private boolean configured = false;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            configured = true;
+        }
+
+        @Override
+        public KafkaPrincipal build(AuthenticationContext context) {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 5683068..35f1377 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -16,27 +16,9 @@
  */
 package org.apache.kafka.common.network;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.LogContext;
@@ -46,6 +28,23 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * A set of tests for the selector. These use a test harness that runs a 
simple socket server that echos back responses.
  */
@@ -58,7 +57,6 @@ public class SslSelectorTest extends SelectorTest {
         File trustStoreFile = File.createTempFile("truststore", ".jks");
 
         Map<String, Object> sslServerConfigs = 
TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, 
"server");
-        sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
         this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
         this.server.start();
         this.time = new MockTime();
@@ -180,7 +178,6 @@ public class SslSelectorTest extends SelectorTest {
         //the initial channel builder is for clients, we need a server one
         File trustStoreFile = File.createTempFile("truststore", ".jks");
         Map<String, Object> sslServerConfigs = 
TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, 
"server");
-        sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
         channelBuilder = new SslChannelBuilder(Mode.SERVER);
         channelBuilder.configure(sslServerConfigs);
         selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, 
"MetricGroup", 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 2ddd349..459a4af 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -16,34 +16,15 @@
  */
 package org.apache.kafka.common.network;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
-
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.security.TestSecurityConfig;
-import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -53,6 +34,25 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Tests for the SSL transport layer. These use a test harness that runs a 
simple socket server that echos back responses.
  */
@@ -260,7 +260,7 @@ public class SslTransportLayerTest {
     @Test
     public void testClientAuthenticationRequiredValidProvided() throws 
Exception {
         String node = "0";
-        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"required");
         server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
@@ -276,8 +276,8 @@ public class SslTransportLayerTest {
     public void testListenerConfigOverride() throws Exception {
         String node = "0";
         ListenerName clientListenerName = new ListenerName("client");
-        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
-        sslServerConfigs.put(clientListenerName.configPrefix() + 
SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"required");
+        sslServerConfigs.put(clientListenerName.configPrefix() + 
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
 
         // `client` listener is not configured at this point, so client auth 
should be required
         server = createEchoServer(SecurityProtocol.SSL);
@@ -317,7 +317,7 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationRequiredUntrustedProvided() throws 
Exception {
         String node = "0";
         sslServerConfigs = serverCertStores.getUntrustingConfig();
-        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"required");
         server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
@@ -333,7 +333,7 @@ public class SslTransportLayerTest {
     @Test
     public void testClientAuthenticationRequiredNotProvided() throws Exception 
{
         String node = "0";
-        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"required");
         server = createEchoServer(SecurityProtocol.SSL);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
@@ -354,7 +354,7 @@ public class SslTransportLayerTest {
     public void testClientAuthenticationDisabledUntrustedProvided() throws 
Exception {
         String node = "0";
         sslServerConfigs = serverCertStores.getUntrustingConfig();
-        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"none");
         server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
@@ -370,7 +370,7 @@ public class SslTransportLayerTest {
     @Test
     public void testClientAuthenticationDisabledNotProvided() throws Exception 
{
         String node = "0";
-        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"none");
         server = createEchoServer(SecurityProtocol.SSL);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
@@ -390,7 +390,7 @@ public class SslTransportLayerTest {
     @Test
     public void testClientAuthenticationRequestedValidProvided() throws 
Exception {
         String node = "0";
-        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"requested");
         server = createEchoServer(SecurityProtocol.SSL);
         createSelector(sslClientConfigs);
         InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
@@ -406,7 +406,7 @@ public class SslTransportLayerTest {
     @Test
     public void testClientAuthenticationRequestedNotProvided() throws 
Exception {
         String node = "0";
-        sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
+        sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"requested");
         server = createEchoServer(SecurityProtocol.SSL);
         
         sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
 
b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
index 4cd5f5d..05294cf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
@@ -20,19 +20,19 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 
 import java.util.Map;
 
 public class TestSecurityConfig extends AbstractConfig {
     private static final ConfigDef CONFIG = new ConfigDef()
-            .define(SslConfigs.SSL_CLIENT_AUTH_CONFIG, Type.STRING, null, 
Importance.MEDIUM,
-                    SslConfigs.SSL_CLIENT_AUTH_DOC)
-            .define(SaslConfigs.SASL_ENABLED_MECHANISMS, Type.LIST, 
SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS,
-                    Importance.MEDIUM, SaslConfigs.SASL_ENABLED_MECHANISMS_DOC)
-            .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, 
SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS,
-                    Importance.MEDIUM, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+            .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, Type.STRING, 
null, Importance.MEDIUM,
+                    BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
+            .define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
Type.LIST,
+                    BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS,
+                    Importance.MEDIUM, 
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC)
+            .define(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
Type.CLASS,
+                    null, Importance.MEDIUM, 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
             .withClientSslSupport()
             .withClientSaslSupport();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
new file mode 100644
index 0000000..fdf9e3c
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
+import org.apache.kafka.common.security.kerberos.KerberosName;
+import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.scram.ScramMechanism;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.junit.Test;
+
+import javax.net.ssl.SSLSession;
+import javax.security.sasl.SaslServer;
+import java.net.InetAddress;
+import java.security.Principal;
+
+import static org.junit.Assert.assertEquals;
+
+public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport {
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testUseOldPrincipalBuilderForPlaintextIfProvided() throws 
Exception {
+        TransportLayer transportLayer = mock(TransportLayer.class);
+        Authenticator authenticator = mock(Authenticator.class);
+        PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class);
+
+        EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, 
authenticator))
+                .andReturn(new DummyPrincipal("foo"));
+        oldPrincipalBuilder.close();
+        EasyMock.expectLastCall();
+
+        replayAll();
+
+        DefaultKafkaPrincipalBuilder builder = 
DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
+                transportLayer, oldPrincipalBuilder, null);
+
+        KafkaPrincipal principal = builder.build(new 
PlaintextAuthenticationContext(InetAddress.getLocalHost()));
+        assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+        assertEquals("foo", principal.getName());
+
+        builder.close();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testReturnAnonymousPrincipalForPlaintext() throws Exception {
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null);
+        assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(new 
PlaintextAuthenticationContext(InetAddress.getLocalHost())));
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testUseOldPrincipalBuilderForSslIfProvided() throws Exception {
+        TransportLayer transportLayer = mock(TransportLayer.class);
+        Authenticator authenticator = mock(Authenticator.class);
+        PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class);
+        SSLSession session = mock(SSLSession.class);
+
+        EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, 
authenticator))
+                .andReturn(new DummyPrincipal("foo"));
+        oldPrincipalBuilder.close();
+        EasyMock.expectLastCall();
+
+        replayAll();
+
+        DefaultKafkaPrincipalBuilder builder = 
DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator,
+                transportLayer, oldPrincipalBuilder, null);
+
+        KafkaPrincipal principal = builder.build(new 
SslAuthenticationContext(session, InetAddress.getLocalHost()));
+        assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+        assertEquals("foo", principal.getName());
+
+        builder.close();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testUseSessionPeerPrincipalForSsl() throws Exception {
+        SSLSession session = mock(SSLSession.class);
+
+        EasyMock.expect(session.getPeerPrincipal()).andReturn(new 
DummyPrincipal("foo"));
+
+        replayAll();
+
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null);
+
+        KafkaPrincipal principal = builder.build(new 
SslAuthenticationContext(session, InetAddress.getLocalHost()));
+        assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+        assertEquals("foo", principal.getName());
+
+        verifyAll();
+    }
+
+    @Test
+    public void testPrincipalBuilderScram() throws Exception {
+        SaslServer server = mock(SaslServer.class);
+
+        
EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
+        EasyMock.expect(server.getAuthorizationID()).andReturn("foo");
+
+        replayAll();
+
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null);
+
+        KafkaPrincipal principal = builder.build(new 
SaslAuthenticationContext(server,
+                SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost()));
+        assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+        assertEquals("foo", principal.getName());
+
+        verifyAll();
+    }
+
+    @Test
+    public void testPrincipalBuilderGssapi() throws Exception {
+        SaslServer server = mock(SaslServer.class);
+        KerberosShortNamer kerberosShortNamer = mock(KerberosShortNamer.class);
+
+        
EasyMock.expect(server.getMechanismName()).andReturn(SaslConfigs.GSSAPI_MECHANISM);
+        
EasyMock.expect(server.getAuthorizationID()).andReturn("foo/h...@realm.com");
+        
EasyMock.expect(kerberosShortNamer.shortName(EasyMock.anyObject(KerberosName.class)))
+                .andReturn("foo");
+
+        replayAll();
+
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(kerberosShortNamer);
+
+        KafkaPrincipal principal = builder.build(new 
SaslAuthenticationContext(server,
+                SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost()));
+        assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+        assertEquals("foo", principal.getName());
+
+        verifyAll();
+    }
+
+    private static class DummyPrincipal implements Principal {
+        private final String name;
+
+        private DummyPrincipal(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String getName() {
+            return name;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java
index db905dd..f204a88 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java
@@ -22,15 +22,6 @@ import org.junit.Test;
 public class KafkaPrincipalTest {
 
     @Test
-    public void testPrincipalNameCanContainSeparator() {
-        String name = "name" + KafkaPrincipal.SEPARATOR + "with" + 
KafkaPrincipal.SEPARATOR + "in" + KafkaPrincipal.SEPARATOR + "it";
-
-        KafkaPrincipal principal = 
KafkaPrincipal.fromString(KafkaPrincipal.USER_TYPE + KafkaPrincipal.SEPARATOR + 
name);
-        Assert.assertEquals(KafkaPrincipal.USER_TYPE, 
principal.getPrincipalType());
-        Assert.assertEquals(name, principal.getName());
-    }
-
-    @Test
     public void testEqualsAndHashCode() {
         String name = "KafkaUser";
         KafkaPrincipal principal1 = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 6b0eca3..2dd7db9 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.security.authenticator;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.CertStores;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -634,7 +635,7 @@ public class SaslAuthenticatorTest {
     public void testDynamicJaasConfiguration() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-        saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, 
Arrays.asList("PLAIN"));
+        
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
Arrays.asList("PLAIN"));
         Map<String, Object> serverOptions = new HashMap<>();
         serverOptions.put("user_user1", "user1-secret");
         serverOptions.put("user_user2", "user2-secret");
@@ -676,7 +677,7 @@ public class SaslAuthenticatorTest {
     public void testJaasConfigurationForListener() throws Exception {
         SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-        saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, 
Arrays.asList("PLAIN"));
+        
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
Arrays.asList("PLAIN"));
 
         TestJaasConfig staticJaasConfig = new TestJaasConfig();
 
@@ -782,7 +783,7 @@ public class SaslAuthenticatorTest {
 
     private TestJaasConfig configureMechanisms(String clientMechanism, 
List<String> serverMechanisms) {
         saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism);
-        saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, 
serverMechanisms);
+        
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
serverMechanisms);
         return TestJaasConfig.createConfiguration(clientMechanism, 
serverMechanisms);
     }
 
@@ -860,7 +861,7 @@ public class SaslAuthenticatorTest {
 
     @SuppressWarnings("unchecked")
     private void updateScramCredentialCache(String username, String password) 
throws NoSuchAlgorithmException {
-        for (String mechanism : (List<String>) 
saslServerConfigs.get(SaslConfigs.SASL_ENABLED_MECHANISMS)) {
+        for (String mechanism : (List<String>) 
saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) {
             ScramMechanism scramMechanism = 
ScramMechanism.forMechanismName(mechanism);
             if (scramMechanism != null) {
                 ScramFormatter formatter = new ScramFormatter(scramMechanism);

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index d37c206..9d8be8d 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.ListenerName;
@@ -26,7 +26,6 @@ import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.JaasContext;
-import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.plain.PlainLoginModule;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
@@ -35,7 +34,6 @@ import org.junit.Test;
 
 import javax.security.auth.Subject;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
@@ -49,11 +47,10 @@ public class SaslServerAuthenticatorTest {
 
     @Test(expected = InvalidReceiveException.class)
     public void testOversizeRequest() throws IOException {
-        SaslServerAuthenticator authenticator = setupAuthenticator();
         TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
-        PrincipalBuilder principalBuilder = null; // SASL authenticator does 
not currently use principal builder
-        Map<String, ?> configs = 
Collections.singletonMap(SaslConfigs.SASL_ENABLED_MECHANISMS,
+        Map<String, ?> configs = 
Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
+        SaslServerAuthenticator authenticator = setupAuthenticator(configs, 
transportLayer);
 
         final Capture<ByteBuffer> size = EasyMock.newCapture();
         
EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new 
IAnswer<Integer>() {
@@ -66,17 +63,15 @@ public class SaslServerAuthenticatorTest {
 
         EasyMock.replay(transportLayer);
 
-        authenticator.configure(transportLayer, principalBuilder, configs);
         authenticator.authenticate();
     }
 
     @Test
     public void testUnexpectedRequestType() throws IOException {
-        SaslServerAuthenticator authenticator = setupAuthenticator();
         TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
-        PrincipalBuilder principalBuilder = null; // SASL authenticator does 
not currently use principal builder
-        Map<String, ?> configs = 
Collections.singletonMap(SaslConfigs.SASL_ENABLED_MECHANISMS,
+        Map<String, ?> configs = 
Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
+        SaslServerAuthenticator authenticator = setupAuthenticator(configs, 
transportLayer);
 
         final RequestHeader header = new RequestHeader(ApiKeys.METADATA, 
(short) 0, "clientId", 13243);
         final Struct headerStruct = header.toStruct();
@@ -102,7 +97,6 @@ public class SaslServerAuthenticatorTest {
 
         EasyMock.replay(transportLayer);
 
-        authenticator.configure(transportLayer, principalBuilder, configs);
         try {
             authenticator.authenticate();
             fail("Expected authenticate() to raise an exception");
@@ -111,13 +105,13 @@ public class SaslServerAuthenticatorTest {
         }
     }
 
-    private SaslServerAuthenticator setupAuthenticator() throws IOException {
+    private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, 
TransportLayer transportLayer) throws IOException {
         TestJaasConfig jaasConfig = new TestJaasConfig();
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), 
new HashMap<String, Object>());
         JaasContext jaasContext = new JaasContext("jaasContext", 
JaasContext.Type.SERVER, jaasConfig);
         Subject subject = new Subject();
-        return new SaslServerAuthenticator("node", jaasContext, subject, null, 
InetAddress.getLocalHost(),
-                new CredentialCache(), new ListenerName("ssl"), 
SecurityProtocol.SASL_SSL);
+        return new SaslServerAuthenticator(configs, "node", jaasContext, 
subject, null, new CredentialCache(),
+                new ListenerName("ssl"), SecurityProtocol.SASL_SSL, 
transportLayer);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
new file mode 100644
index 0000000..273c13a
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SecurityUtilsTest {
+
+    @Test
+    public void testPrincipalNameCanContainSeparator() {
+        String name = "name:with:separator:in:it";
+        KafkaPrincipal principal = 
SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + name);
+        assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType());
+        assertEquals(name, principal.getName());
+    }
+
+    @Test
+    public void testParseKafkaPrincipalWithNonUserPrincipalType() {
+        String name = "foo";
+        String principalType = "Group";
+        KafkaPrincipal principal = 
SecurityUtils.parseKafkaPrincipal(principalType + ":" + name);
+        assertEquals(principalType, principal.getPrincipalType());
+        assertEquals(name, principal.getName());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index ffe75a8..de2bcde 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -31,14 +31,11 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Rate
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, 
KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.protocol.types.SchemaException
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
 import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
 
@@ -547,10 +544,9 @@ private[kafka] class Processor(val id: Int,
       try {
         openOrClosingChannel(receive.source) match {
           case Some(channel) =>
-            val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
channel.principal.getName)
             val header = RequestHeader.parse(receive.payload)
             val context = new RequestContext(header, receive.source, 
channel.socketAddress,
-              principal, listenerName, securityProtocol)
+              channel.principal, listenerName, securityProtocol)
             val req = new RequestChannel.Request(processor = id, context = 
context,
               startTimeNanos = time.nanoseconds, memoryPool, receive.payload)
             requestChannel.sendRequest(req)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index de6559c..33eaf48 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -29,6 +29,7 @@ import kafka.utils.CoreUtils
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.ConfigDef.ValidList
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SaslConfigs, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
@@ -193,7 +194,6 @@ object Defaults {
   val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
 
   /** ********* SSL configuration ***********/
-  val PrincipalBuilderClass = SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
   val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
   val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
   val SslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
@@ -212,7 +212,7 @@ object Defaults {
   val SaslKerberosTicketRenewWindowFactor = 
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
   val SaslKerberosTicketRenewJitter = 
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeRelogin = 
SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN
-  val SaslKerberosPrincipalToLocalRules = 
SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
+  val SaslKerberosPrincipalToLocalRules = 
BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
 }
 
 object KafkaConfig {
@@ -375,8 +375,10 @@ object KafkaConfig {
   val MetricReporterClassesProp: String = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
   val MetricRecordingLevelProp: String = 
CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
 
+  /** ******** Common Security Configuration *************/
+  val PrincipalBuilderClassProp = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
+
   /** ********* SSL Configuration ****************/
-  val PrincipalBuilderClassProp = SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
   val SslProtocolProp = SslConfigs.SSL_PROTOCOL_CONFIG
   val SslProviderProp = SslConfigs.SSL_PROVIDER_CONFIG
   val SslCipherSuitesProp = SslConfigs.SSL_CIPHER_SUITES_CONFIG
@@ -392,17 +394,17 @@ object KafkaConfig {
   val SslTrustManagerAlgorithmProp = 
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
   val SslEndpointIdentificationAlgorithmProp = 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
   val SslSecureRandomImplementationProp = 
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG
-  val SslClientAuthProp = SslConfigs.SSL_CLIENT_AUTH_CONFIG
+  val SslClientAuthProp = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG
 
   /** ********* SASL Configuration ****************/
   val SaslMechanismInterBrokerProtocolProp = 
"sasl.mechanism.inter.broker.protocol"
-  val SaslEnabledMechanismsProp = SaslConfigs.SASL_ENABLED_MECHANISMS
+  val SaslEnabledMechanismsProp = 
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG
   val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME
   val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD
   val SaslKerberosTicketRenewWindowFactorProp = 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR
   val SaslKerberosTicketRenewJitterProp = 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER
   val SaslKerberosMinTimeBeforeReloginProp = 
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN
-  val SaslKerberosPrincipalToLocalRulesProp = 
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES
+  val SaslKerberosPrincipalToLocalRulesProp = 
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG
 
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
@@ -636,8 +638,10 @@ object KafkaConfig {
   val MetricReporterClassesDoc = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
   val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
 
+  /** ******** Common Security Configuration *************/
+  val PrincipalBuilderClassDoc = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
+
   /** ********* SSL Configuration ****************/
-  val PrincipalBuilderClassDoc = SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC
   val SslProtocolDoc = SslConfigs.SSL_PROTOCOL_DOC
   val SslProviderDoc = SslConfigs.SSL_PROVIDER_DOC
   val SslCipherSuitesDoc = SslConfigs.SSL_CIPHER_SUITES_DOC
@@ -653,7 +657,7 @@ object KafkaConfig {
   val SslTrustManagerAlgorithmDoc = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC
   val SslEndpointIdentificationAlgorithmDoc = 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
   val SslSecureRandomImplementationDoc = 
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC
-  val SslClientAuthDoc = SslConfigs.SSL_CLIENT_AUTH_DOC
+  val SslClientAuthDoc = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC
 
   /** ********* Sasl Configuration ****************/
   val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for 
inter-broker communication. Default is GSSAPI."
@@ -663,7 +667,7 @@ object KafkaConfig {
   val SaslKerberosTicketRenewWindowFactorDoc = 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC
   val SaslKerberosTicketRenewJitterDoc = 
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC
   val SaslKerberosMinTimeBeforeReloginDoc = 
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC
-  val SaslKerberosPrincipalToLocalRulesDoc = 
SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
+  val SaslKerberosPrincipalToLocalRulesDoc = 
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC
 
   private val configDef = {
     import ConfigDef.Importance._
@@ -834,7 +838,7 @@ object KafkaConfig {
       .define(ReplicationQuotaWindowSizeSecondsProp, INT, 
Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, 
ReplicationQuotaWindowSizeSecondsDoc)
 
       /** ********* SSL Configuration ****************/
-      .define(PrincipalBuilderClassProp, CLASS, 
Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
+      .define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, 
PrincipalBuilderClassDoc)
       .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, 
SslProtocolDoc)
       .define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc)
       .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, 
MEDIUM, SslEnabledProtocolsDoc)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 3376d23..bbb3249 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -69,7 +69,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   val topicWildcard = "*"
   val part = 0
   val tp = new TopicPartition(topic, part)
-  val topicAndPartition = new TopicAndPartition(topic, part)
+  val topicAndPartition = TopicAndPartition(topic, part)
   val clientPrincipal: String
   val kafkaPrincipal: String
 
@@ -154,8 +154,8 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     * Starts MiniKDC and only then sets up the parent trait.
     */
   @Before
-  override def setUp {
-    super.setUp
+  override def setUp() {
+    super.setUp()
     AclCommand.main(topicBrokerReadAclArgs)
     servers.foreach { s =>
       TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, 
new Resource(Topic, "*"))
@@ -177,9 +177,9 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     * Closes MiniKDC last when tearing down.
     */
   @After
-  override def tearDown {
+  override def tearDown() {
     consumers.foreach(_.wakeup())
-    super.tearDown
+    super.tearDown()
     closeSasl()
   }
 
@@ -240,7 +240,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
     */
   @Test(expected = classOf[KafkaException])
   def testNoConsumeWithoutDescribeAclViaAssign(): Unit = {
-    noConsumeWithoutDescribeAclSetup
+    noConsumeWithoutDescribeAclSetup()
     consumers.head.assign(List(tp).asJava)
     // the exception is expected when the consumer attempts to lookup offsets
     consumeRecords(this.consumers.head)
@@ -248,7 +248,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   
   @Test(expected = classOf[TimeoutException])
   def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
-    noConsumeWithoutDescribeAclSetup
+    noConsumeWithoutDescribeAclSetup()
     consumers.head.subscribe(List(topic).asJava)
     // this should timeout since the consumer will not be able to fetch any 
metadata for the topic
     consumeRecords(this.consumers.head, timeout = 3000)
@@ -273,7 +273,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   
   @Test
   def testNoConsumeWithDescribeAclViaAssign(): Unit = {
-    noConsumeWithDescribeAclSetup
+    noConsumeWithDescribeAclSetup()
     consumers.head.assign(List(tp).asJava)
 
     try {
@@ -287,7 +287,7 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   
   @Test
   def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
-    noConsumeWithDescribeAclSetup
+    noConsumeWithDescribeAclSetup()
     consumers.head.subscribe(List(topic).asJava)
 
     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index b8dc57b..4b27239 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -26,6 +26,7 @@ import java.util.Properties
 import org.apache.kafka.clients.producer.KafkaProducer
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
+import org.apache.kafka.common.network.ListenerName
 import org.junit.{After, Before}
 
 import scala.collection.mutable.Buffer
@@ -46,14 +47,21 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
   val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
+  protected def interBrokerListenerName: ListenerName = listenerName
+
   override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, 
interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, 
logDirCount = logDirCount)
     cfgs.foreach { config =>
-      config.setProperty(KafkaConfig.ListenersProp, 
s"${listenerName.value}://localhost:${TestUtils.RandomPort}")
       config.remove(KafkaConfig.InterBrokerSecurityProtocolProp)
-      config.setProperty(KafkaConfig.InterBrokerListenerNameProp, 
listenerName.value)
-      config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"${listenerName.value}:${securityProtocol.name}")
+      config.setProperty(KafkaConfig.InterBrokerListenerNameProp, 
interBrokerListenerName.value)
+
+      val listenerNames = Set(listenerName, interBrokerListenerName)
+      val listeners = listenerNames.map(listenerName => 
s"${listenerName.value}://localhost:${TestUtils.RandomPort}").mkString(",")
+      val listenerSecurityMap = listenerNames.map(listenerName => 
s"${listenerName.value}:${securityProtocol.name}").mkString(",")
+
+      config.setProperty(KafkaConfig.ListenersProp, listeners)
+      config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
listenerSecurityMap)
     }
     cfgs.foreach(_ ++= serverConfig)
     cfgs.map(KafkaConfig.fromProps)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
new file mode 100644
index 0000000..2aeccb4
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, PlaintextAuthenticationContext}
+import org.junit.Before
+
+// This test case uses a separate listener for client and inter-broker 
communication, from
+// which we derive corresponding principals
+object PlaintextEndToEndAuthorizationTest {
+  class TestClientPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      context match {
+        case ctx: PlaintextAuthenticationContext if ctx.clientAddress != null 
=>
+          new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+        case _ =>
+          KafkaPrincipal.ANONYMOUS
+      }
+    }
+  }
+
+  class TestServerPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      context match {
+        case ctx: PlaintextAuthenticationContext =>
+          new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
+        case _ =>
+          KafkaPrincipal.ANONYMOUS
+      }
+    }
+  }
+}
+
+class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+  import PlaintextEndToEndAuthorizationTest.{TestClientPrincipalBuilder, 
TestServerPrincipalBuilder}
+
+  override protected def securityProtocol = SecurityProtocol.PLAINTEXT
+  override protected def listenerName: ListenerName = new 
ListenerName("CLIENT")
+  override protected def interBrokerListenerName: ListenerName = new 
ListenerName("SERVER")
+
+  this.serverConfig.setProperty("listener.name.client." + 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
+    classOf[TestClientPrincipalBuilder].getName)
+  this.serverConfig.setProperty("listener.name.server." + 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
+    classOf[TestServerPrincipalBuilder].getName)
+  override val clientPrincipal = "client"
+  override val kafkaPrincipal = "server"
+
+  @Before
+  override def setUp() {
+    startSasl(jaasSections(List.empty, None, ZkSasl))
+    super.setUp()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index 975ca4c..c28de3e 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -17,13 +17,38 @@
 package kafka.api
 
 import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext}
 import org.junit.Test
 
+object SaslPlainSslEndToEndAuthorizationTest {
+  class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      context match {
+        case ctx: SaslAuthenticationContext =>
+          ctx.server.getAuthorizationID match {
+            case JaasTestUtils.KafkaPlainAdmin =>
+              new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
+            case JaasTestUtils.KafkaPlainUser =>
+              new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
+            case _ =>
+              KafkaPrincipal.ANONYMOUS
+          }
+      }
+    }
+  }
+}
+
 class SaslPlainSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTest {
+  import SaslPlainSslEndToEndAuthorizationTest.TestPrincipalBuilder
+
+  
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
 classOf[TestPrincipalBuilder].getName)
+
   override protected def kafkaClientSaslMechanism = "PLAIN"
   override protected def kafkaServerSaslMechanisms = List("PLAIN")
-  override val clientPrincipal = JaasTestUtils.KafkaPlainUser
-  override val kafkaPrincipal = JaasTestUtils.KafkaPlainAdmin
+  override val clientPrincipal = "user"
+  override val kafkaPrincipal = "admin"
 
   /**
    * Checks that secure paths created by broker and acl paths created by 
AclCommand

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 1128ad0..f874f4e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -28,6 +28,7 @@ import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.authenticator.LoginManager
 import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 
 /*
  * Implements an enumeration for the modes enabled here:
@@ -119,7 +120,7 @@ trait SaslSetup {
   def kafkaServerSaslProperties(serverSaslMechanisms: Seq[String], 
interBrokerSaslMechanism: String): Properties = {
     val props = new Properties
     props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, 
interBrokerSaslMechanism)
-    props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, 
serverSaslMechanisms.mkString(","))
+    props.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
serverSaslMechanisms.mkString(","))
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index caa988d..8d3f9b2 100644
--- 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -17,19 +17,42 @@
 
 package kafka.api
 
-import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, SslAuthenticationContext}
 import org.junit.Before
 
+object SslEndToEndAuthorizationTest {
+  class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+    private val Pattern = "O=A (.*?),CN=localhost".r
+
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
+      context match {
+        case ctx: SslAuthenticationContext =>
+          ctx.session.getPeerPrincipal.getName match {
+            case Pattern(name) =>
+              new KafkaPrincipal(KafkaPrincipal.USER_TYPE, name)
+            case _ =>
+              KafkaPrincipal.ANONYMOUS
+          }
+      }
+    }
+  }
+}
+
 class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
+  import kafka.api.SslEndToEndAuthorizationTest.TestPrincipalBuilder
+
   override protected def securityProtocol = SecurityProtocol.SSL
-  this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required")
-  override val clientPrincipal = "O=A client,CN=localhost"
-  override val kafkaPrincipal = "O=A server,CN=localhost"
+  this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, 
"required")
+  
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
 classOf[TestPrincipalBuilder].getName)
+  override val clientPrincipal = "client"
+  override val kafkaPrincipal = "server"
 
   @Before
-  override def setUp {
+  override def setUp() {
     startSasl(jaasSections(List.empty, None, ZkSasl))
-    super.setUp
+    super.setUp()
   }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index c005c72..1b5841d 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -80,7 +80,7 @@ abstract class KafkaServerTestHarness extends 
ZooKeeperTestHarness {
 
   @Before
   override def setUp() {
-    super.setUp
+    super.setUp()
 
     if (configs.isEmpty)
       throw new KafkaException("Must supply at least one server config.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 5dbd1a8..ca17b9a 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -34,7 +34,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   val simpleAclAuthorizer2 = new SimpleAclAuthorizer
   val testPrincipal = Acl.WildCardPrincipal
   val testHostName = InetAddress.getByName("192.168.0.1")
-  val session = new Session(testPrincipal, testHostName)
+  val session = Session(testPrincipal, testHostName)
   var resource: Resource = null
   val superUsers = "User:superuser1; User:superuser2"
   val username = "alice"
@@ -95,8 +95,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
     changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
 
-    val host1Session = new Session(user1, host1)
-    val host2Session = new Session(user1, host2)
+    val host1Session = Session(user1, host1)
+    val host2Session = Session(user1, host2)
 
     assertTrue("User1 should have READ access from host2", 
simpleAclAuthorizer.authorize(host2Session, Read, resource))
     assertFalse("User1 should not have READ access from host1 due to denyAcl", 
simpleAclAuthorizer.authorize(host1Session, Read, resource))
@@ -108,8 +108,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
     assertFalse("User1 should not have edit access from host2", 
simpleAclAuthorizer.authorize(host2Session, Alter, resource))
 
     //test if user has READ and write access they also get describe access
-    val user2Session = new Session(user2, host1)
-    val user3Session = new Session(user3, host1)
+    val user2Session = Session(user2, host1)
+    val user3Session = Session(user3, host1)
     assertTrue("User2 should have DESCRIBE access from host1", 
simpleAclAuthorizer.authorize(user2Session, Describe, resource))
     assertTrue("User3 should have DESCRIBE access from host2", 
simpleAclAuthorizer.authorize(user3Session, Describe, resource))
     assertTrue("User2 should have READ access from host1", 
simpleAclAuthorizer.authorize(user2Session, Read, resource))
@@ -120,7 +120,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   def testDenyTakesPrecedence() {
     val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
     val host = InetAddress.getByName("192.168.2.1")
-    val session = new Session(user, host)
+    val session = Session(user, host)
 
     val allowAll = Acl.AllowAllAcl
     val denyAcl = new Acl(user, Deny, host.getHostAddress, All)
@@ -137,7 +137,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
     changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl])
 
-    val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"random"), InetAddress.getByName("192.0.4.4"))
+    val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"random"), InetAddress.getByName("192.0.4.4"))
     assertTrue("allow all acl should allow access to all.", 
simpleAclAuthorizer.authorize(session, Read, resource))
   }
 
@@ -147,8 +147,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
     changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
 
-    val session1 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"superuser1"), InetAddress.getByName("192.0.4.4"))
-    val session2 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"superuser2"), InetAddress.getByName("192.0.4.4"))
+    val session1 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"superuser1"), InetAddress.getByName("192.0.4.4"))
+    val session2 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"superuser2"), InetAddress.getByName("192.0.4.4"))
 
     assertTrue("superuser always has access, no matter what acls.", 
simpleAclAuthorizer.authorize(session1, Read, resource))
     assertTrue("superuser always has access, no matter what acls.", 
simpleAclAuthorizer.authorize(session2, Read, resource))
@@ -165,7 +165,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 
     val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), 
Set.empty[Acl], wildCardResource)
 
-    val host1Session = new Session(user1, host1)
+    val host1Session = Session(user1, host1)
     assertTrue("User1 should have Read access from host1", 
simpleAclAuthorizer.authorize(host1Session, Read, resource))
 
     //allow Write to specific topic.

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index aec68e2..3ec03c3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -28,20 +28,19 @@ import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter
 import org.apache.kafka.common.resource.{ResourceFilter, Resource => 
AdminResource, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
-import org.apache.kafka.common.network.{Authenticator, ListenerName, 
TransportLayer}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.protocol.types.Struct
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType 
=> RResourceType, _}
-import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, 
KafkaPrincipal}
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-
 class RequestQuotaTest extends BaseRequestTest {
 
   override def numBrokers: Int = 1
@@ -85,7 +84,7 @@ class RequestQuotaTest extends BaseRequestTest {
     AdminUtils.changeClientIdConfig(zkUtils, unthrottledClientId, quotaProps)
 
     TestUtils.retry(10000) {
-      val quotaManager = servers(0).apis.quotas.request
+      val quotaManager = servers.head.apis.quotas.request
       assertEquals(s"Default request quota not set", Quota.upperBound(0.01), 
quotaManager.quota("some-user", "some-client"))
       assertEquals(s"Request quota override not set", Quota.upperBound(2000), 
quotaManager.quota("some-user", unthrottledClientId))
     }
@@ -445,8 +444,8 @@ object RequestQuotaTest {
       session.principal != UnauthorizedPrincipal
     }
   }
-  class TestPrincipalBuilder extends DefaultPrincipalBuilder {
-    override def buildPrincipal(transportLayer: TransportLayer,  
authenticator: Authenticator) = {
+  class TestPrincipalBuilder extends KafkaPrincipalBuilder {
+    override def build(context: AuthenticationContext): KafkaPrincipal = {
       principal
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 9517789..cd31f2c 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -158,9 +158,9 @@ object JaasTestUtils {
     kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, 
KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString
 
   def zkSections: Seq[JaasSection] = Seq(
-    new JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false,
+    JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false,
       Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> 
ZkUserPassword)))),
-    new JaasSection(ZkClientContextName, Seq(ZkDigestModule(debug = false,
+    JaasSection(ZkClientContextName, Seq(ZkDigestModule(debug = false,
       Map("username" -> ZkUser, "password" -> ZkUserPassword))))
   )
 
@@ -191,7 +191,7 @@ object JaasTestUtils {
           debug = false)
       case mechanism => throw new IllegalArgumentException("Unsupported server 
mechanism " + mechanism)
     }
-    new JaasSection(contextName, modules)
+    JaasSection(contextName, modules)
   }
 
   // consider refactoring if more mechanisms are added
@@ -227,7 +227,7 @@ object JaasTestUtils {
    * Used for the static JAAS configuration and it uses the credentials for 
client#2
    */
   def kafkaClientSection(mechanism: Option[String], keytabLocation: 
Option[File]): JaasSection = {
-    new JaasSection(KafkaClientContextName, mechanism.map(m =>
+    JaasSection(KafkaClientContextName, mechanism.map(m =>
       kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, 
KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, 
KafkaScramPassword2)).toSeq)
   }
 

Reply via email to