Repository: kafka Updated Branches: refs/heads/trunk f7b1add68 -> 3b5d88feb
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java index 52a34fc..72e06a2 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java @@ -21,7 +21,6 @@ import java.util.Map; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.auth.AuthCallbackHandler; -import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +30,6 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; -import org.apache.kafka.common.security.kerberos.KerberosName; import org.apache.kafka.common.network.Mode; /** @@ -42,12 +40,10 @@ import org.apache.kafka.common.network.Mode; */ public class SaslServerCallbackHandler implements AuthCallbackHandler { private static final Logger LOG = LoggerFactory.getLogger(SaslServerCallbackHandler.class); - private final KerberosShortNamer kerberosShortNamer; private final JaasContext jaasContext; - public SaslServerCallbackHandler(JaasContext jaasContext, KerberosShortNamer kerberosNameParser) throws IOException { + public SaslServerCallbackHandler(JaasContext jaasContext) throws IOException { this.jaasContext = jaasContext; - this.kerberosShortNamer = kerberosNameParser; } @Override @@ -77,19 +73,10 @@ public class SaslServerCallbackHandler implements AuthCallbackHandler { private void handleAuthorizeCallback(AuthorizeCallback ac) { String authenticationID = ac.getAuthenticationID(); String authorizationID = ac.getAuthorizationID(); - - LOG.info("Successfully authenticated client: authenticationID={}; authorizationID={}.", authenticationID, - authorizationID); + LOG.info("Successfully authenticated client: authenticationID={}; authorizationID={}.", + authenticationID, authorizationID); ac.setAuthorized(true); - - KerberosName kerberosName = KerberosName.parse(authenticationID); - try { - String userName = kerberosShortNamer.shortName(kerberosName); - LOG.info("Setting authorizedID: {}", userName); - ac.setAuthorizedID(userName); - } catch (IOException e) { - LOG.error("Failed to set name for '{}' based on Kerberos authentication rules.", kerberosName, e); - } + ac.setAuthorizedID(authenticationID); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index c7905bb..6b2fe74 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.security.ssl; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.config.types.Password; @@ -97,7 +98,7 @@ public class SslFactory implements Configurable { String clientAuthConfig = clientAuthConfigOverride; if (clientAuthConfig == null) - clientAuthConfig = (String) configs.get(SslConfigs.SSL_CLIENT_AUTH_CONFIG); + clientAuthConfig = (String) configs.get(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG); if (clientAuthConfig != null) { if (clientAuthConfig.equals("required")) this.needClientAuth = true; http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java new file mode 100644 index 0000000..9c9bd44 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/SecurityUtils.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; + +public class SecurityUtils { + + public static KafkaPrincipal parseKafkaPrincipal(String str) { + if (str == null || str.isEmpty()) { + throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str); + } + + String[] split = str.split(":", 2); + + if (split.length != 2) { + throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str); + } + + return new KafkaPrincipal(split[0], split[1]); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 2025107..1137045 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.utils; -import java.text.DecimalFormat; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +44,7 @@ import java.nio.file.Path; import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; import java.nio.file.attribute.BasicFileAttributes; +import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -719,7 +719,7 @@ public class Utils { try { closeable.close(); } catch (Throwable t) { - log.warn("Failed to close {}", name, t); + log.warn("Failed to close {} with type {}", name, closeable.getClass().getName(), t); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/network/CertStores.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java index b6cc1d4..916e619 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/CertStores.java +++ b/clients/src/test/java/org/apache/kafka/common/network/CertStores.java @@ -16,14 +16,14 @@ */ package org.apache.kafka.common.network; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.test.TestSslUtils; + import java.io.File; import java.net.InetAddress; import java.util.HashMap; import java.util.Map; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.test.TestSslUtils; - public class CertStores { private final Map<String, Object> sslConfig; @@ -45,8 +45,6 @@ public class CertStores { Mode mode = server ? Mode.SERVER : Mode.CLIENT; File truststoreFile = File.createTempFile(name + "TS", ".jks"); sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name, commonName, certBuilder); - if (server) - sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); } public Map<String, Object> getTrustingConfig(CertStores truststoreConfig) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java new file mode 100644 index 0000000..de210e7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.security.auth.AuthenticationContext; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; +import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext; +import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.easymock.EasyMock; +import org.junit.Test; + +import java.net.InetAddress; +import java.security.Principal; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ChannelBuildersTest { + + @Test + @SuppressWarnings("deprecation") + public void testCreateOldPrincipalBuilder() throws Exception { + TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); + Authenticator authenticator = EasyMock.mock(Authenticator.class); + + Map<String, Object> configs = new HashMap<>(); + configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, OldPrincipalBuilder.class); + KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, null); + + // test old principal builder is properly configured and delegated to + assertTrue(OldPrincipalBuilder.configured); + + // test delegation + KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost())); + assertEquals(OldPrincipalBuilder.PRINCIPAL_NAME, principal.getName()); + assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); + } + + @Test + public void testCreateConfigurableKafkaPrincipalBuilder() { + Map<String, Object> configs = new HashMap<>(); + configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigurableKafkaPrincipalBuilder.class); + KafkaPrincipalBuilder builder = ChannelBuilders.createPrincipalBuilder(configs, null, null, null); + assertTrue(builder instanceof ConfigurableKafkaPrincipalBuilder); + assertTrue(((ConfigurableKafkaPrincipalBuilder) builder).configured); + } + + @SuppressWarnings("deprecation") + public static class OldPrincipalBuilder implements PrincipalBuilder { + private static boolean configured = false; + private static final String PRINCIPAL_NAME = "bob"; + + @Override + public void configure(Map<String, ?> configs) { + configured = true; + } + + @Override + public Principal buildPrincipal(TransportLayer transportLayer, Authenticator authenticator) throws KafkaException { + return new Principal() { + @Override + public String getName() { + return PRINCIPAL_NAME; + } + }; + } + + @Override + public void close() throws KafkaException { + + } + } + + public static class ConfigurableKafkaPrincipalBuilder implements KafkaPrincipalBuilder, Configurable { + private boolean configured = false; + + @Override + public void configure(Map<String, ?> configs) { + configured = true; + } + + @Override + public KafkaPrincipal build(AuthenticationContext context) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 5683068..35f1377 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -16,27 +16,9 @@ */ package org.apache.kafka.common.network; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; - import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.memory.SimpleMemoryPool; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.LogContext; @@ -46,6 +28,23 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + /** * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses. */ @@ -58,7 +57,6 @@ public class SslSelectorTest extends SelectorTest { File trustStoreFile = File.createTempFile("truststore", ".jks"); Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); - sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs); this.server.start(); this.time = new MockTime(); @@ -180,7 +178,6 @@ public class SslSelectorTest extends SelectorTest { //the initial channel builder is for clients, we need a server one File trustStoreFile = File.createTempFile("truststore", ".jks"); Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); - sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); channelBuilder = new SslChannelBuilder(Mode.SERVER); channelBuilder.configure(sslServerConfigs); selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup", http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 2ddd349..459a4af 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -16,34 +16,15 @@ */ package org.apache.kafka.common.network; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; - import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.memory.MemoryPool; -import org.apache.kafka.common.security.TestSecurityConfig; -import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -53,6 +34,25 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses. */ @@ -260,7 +260,7 @@ public class SslTransportLayerTest { @Test public void testClientAuthenticationRequiredValidProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); @@ -276,8 +276,8 @@ public class SslTransportLayerTest { public void testListenerConfigOverride() throws Exception { String node = "0"; ListenerName clientListenerName = new ListenerName("client"); - sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); - sslServerConfigs.put(clientListenerName.configPrefix() + SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + sslServerConfigs.put(clientListenerName.configPrefix() + BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); // `client` listener is not configured at this point, so client auth should be required server = createEchoServer(SecurityProtocol.SSL); @@ -317,7 +317,7 @@ public class SslTransportLayerTest { public void testClientAuthenticationRequiredUntrustedProvided() throws Exception { String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); - sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); @@ -333,7 +333,7 @@ public class SslTransportLayerTest { @Test public void testClientAuthenticationRequiredNotProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); @@ -354,7 +354,7 @@ public class SslTransportLayerTest { public void testClientAuthenticationDisabledUntrustedProvided() throws Exception { String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); - sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); @@ -370,7 +370,7 @@ public class SslTransportLayerTest { @Test public void testClientAuthenticationDisabledNotProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); @@ -390,7 +390,7 @@ public class SslTransportLayerTest { @Test public void testClientAuthenticationRequestedValidProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); server = createEchoServer(SecurityProtocol.SSL); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); @@ -406,7 +406,7 @@ public class SslTransportLayerTest { @Test public void testClientAuthenticationRequestedNotProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); + sslServerConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); server = createEchoServer(SecurityProtocol.SSL); sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java index 4cd5f5d..05294cf 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java +++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java @@ -20,19 +20,19 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import java.util.Map; public class TestSecurityConfig extends AbstractConfig { private static final ConfigDef CONFIG = new ConfigDef() - .define(SslConfigs.SSL_CLIENT_AUTH_CONFIG, Type.STRING, null, Importance.MEDIUM, - SslConfigs.SSL_CLIENT_AUTH_DOC) - .define(SaslConfigs.SASL_ENABLED_MECHANISMS, Type.LIST, SaslConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, - Importance.MEDIUM, SaslConfigs.SASL_ENABLED_MECHANISMS_DOC) - .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, - Importance.MEDIUM, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, Type.STRING, null, Importance.MEDIUM, + BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC) + .define(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Type.LIST, + BrokerSecurityConfigs.DEFAULT_SASL_ENABLED_MECHANISMS, + Importance.MEDIUM, BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_DOC) + .define(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, + null, Importance.MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC) .withClientSslSupport() .withClientSaslSupport(); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java new file mode 100644 index 0000000..fdf9e3c --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Authenticator; +import org.apache.kafka.common.network.TransportLayer; +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder; +import org.apache.kafka.common.security.kerberos.KerberosName; +import org.apache.kafka.common.security.kerberos.KerberosShortNamer; +import org.apache.kafka.common.security.scram.ScramMechanism; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.junit.Test; + +import javax.net.ssl.SSLSession; +import javax.security.sasl.SaslServer; +import java.net.InetAddress; +import java.security.Principal; + +import static org.junit.Assert.assertEquals; + +public class DefaultKafkaPrincipalBuilderTest extends EasyMockSupport { + + @Test + @SuppressWarnings("deprecation") + public void testUseOldPrincipalBuilderForPlaintextIfProvided() throws Exception { + TransportLayer transportLayer = mock(TransportLayer.class); + Authenticator authenticator = mock(Authenticator.class); + PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class); + + EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator)) + .andReturn(new DummyPrincipal("foo")); + oldPrincipalBuilder.close(); + EasyMock.expectLastCall(); + + replayAll(); + + DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, + transportLayer, oldPrincipalBuilder, null); + + KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost())); + assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); + assertEquals("foo", principal.getName()); + + builder.close(); + + verifyAll(); + } + + @Test + public void testReturnAnonymousPrincipalForPlaintext() throws Exception { + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); + assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost()))); + } + + @Test + @SuppressWarnings("deprecation") + public void testUseOldPrincipalBuilderForSslIfProvided() throws Exception { + TransportLayer transportLayer = mock(TransportLayer.class); + Authenticator authenticator = mock(Authenticator.class); + PrincipalBuilder oldPrincipalBuilder = mock(PrincipalBuilder.class); + SSLSession session = mock(SSLSession.class); + + EasyMock.expect(oldPrincipalBuilder.buildPrincipal(transportLayer, authenticator)) + .andReturn(new DummyPrincipal("foo")); + oldPrincipalBuilder.close(); + EasyMock.expectLastCall(); + + replayAll(); + + DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, + transportLayer, oldPrincipalBuilder, null); + + KafkaPrincipal principal = builder.build(new SslAuthenticationContext(session, InetAddress.getLocalHost())); + assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); + assertEquals("foo", principal.getName()); + + builder.close(); + + verifyAll(); + } + + @Test + public void testUseSessionPeerPrincipalForSsl() throws Exception { + SSLSession session = mock(SSLSession.class); + + EasyMock.expect(session.getPeerPrincipal()).andReturn(new DummyPrincipal("foo")); + + replayAll(); + + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); + + KafkaPrincipal principal = builder.build(new SslAuthenticationContext(session, InetAddress.getLocalHost())); + assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); + assertEquals("foo", principal.getName()); + + verifyAll(); + } + + @Test + public void testPrincipalBuilderScram() throws Exception { + SaslServer server = mock(SaslServer.class); + + EasyMock.expect(server.getMechanismName()).andReturn(ScramMechanism.SCRAM_SHA_256.mechanismName()); + EasyMock.expect(server.getAuthorizationID()).andReturn("foo"); + + replayAll(); + + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); + + KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server, + SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost())); + assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); + assertEquals("foo", principal.getName()); + + verifyAll(); + } + + @Test + public void testPrincipalBuilderGssapi() throws Exception { + SaslServer server = mock(SaslServer.class); + KerberosShortNamer kerberosShortNamer = mock(KerberosShortNamer.class); + + EasyMock.expect(server.getMechanismName()).andReturn(SaslConfigs.GSSAPI_MECHANISM); + EasyMock.expect(server.getAuthorizationID()).andReturn("foo/h...@realm.com"); + EasyMock.expect(kerberosShortNamer.shortName(EasyMock.anyObject(KerberosName.class))) + .andReturn("foo"); + + replayAll(); + + DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer); + + KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server, + SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost())); + assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); + assertEquals("foo", principal.getName()); + + verifyAll(); + } + + private static class DummyPrincipal implements Principal { + private final String name; + + private DummyPrincipal(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java index db905dd..f204a88 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/auth/KafkaPrincipalTest.java @@ -22,15 +22,6 @@ import org.junit.Test; public class KafkaPrincipalTest { @Test - public void testPrincipalNameCanContainSeparator() { - String name = "name" + KafkaPrincipal.SEPARATOR + "with" + KafkaPrincipal.SEPARATOR + "in" + KafkaPrincipal.SEPARATOR + "it"; - - KafkaPrincipal principal = KafkaPrincipal.fromString(KafkaPrincipal.USER_TYPE + KafkaPrincipal.SEPARATOR + name); - Assert.assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); - Assert.assertEquals(name, principal.getName()); - } - - @Test public void testEqualsAndHashCode() { String name = "KafkaUser"; KafkaPrincipal principal1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, name); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 6b0eca3..2dd7db9 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.security.authenticator; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.CertStores; import org.apache.kafka.common.network.ChannelBuilder; @@ -634,7 +635,7 @@ public class SaslAuthenticatorTest { public void testDynamicJaasConfiguration() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL; saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, Arrays.asList("PLAIN")); + saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN")); Map<String, Object> serverOptions = new HashMap<>(); serverOptions.put("user_user1", "user1-secret"); serverOptions.put("user_user2", "user2-secret"); @@ -676,7 +677,7 @@ public class SaslAuthenticatorTest { public void testJaasConfigurationForListener() throws Exception { SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT; saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); - saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, Arrays.asList("PLAIN")); + saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN")); TestJaasConfig staticJaasConfig = new TestJaasConfig(); @@ -782,7 +783,7 @@ public class SaslAuthenticatorTest { private TestJaasConfig configureMechanisms(String clientMechanism, List<String> serverMechanisms) { saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, clientMechanism); - saslServerConfigs.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms); + saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, serverMechanisms); return TestJaasConfig.createConfiguration(clientMechanism, serverMechanisms); } @@ -860,7 +861,7 @@ public class SaslAuthenticatorTest { @SuppressWarnings("unchecked") private void updateScramCredentialCache(String username, String password) throws NoSuchAlgorithmException { - for (String mechanism : (List<String>) saslServerConfigs.get(SaslConfigs.SASL_ENABLED_MECHANISMS)) { + for (String mechanism : (List<String>) saslServerConfigs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG)) { ScramMechanism scramMechanism = ScramMechanism.forMechanismName(mechanism); if (scramMechanism != null) { ScramFormatter formatter = new ScramFormatter(scramMechanism); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java index d37c206..9d8be8d 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.security.authenticator; -import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.network.InvalidReceiveException; import org.apache.kafka.common.network.ListenerName; @@ -26,7 +26,6 @@ import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.security.JaasContext; -import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.security.plain.PlainLoginModule; import org.easymock.Capture; import org.easymock.EasyMock; @@ -35,7 +34,6 @@ import org.junit.Test; import javax.security.auth.Subject; import java.io.IOException; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; @@ -49,11 +47,10 @@ public class SaslServerAuthenticatorTest { @Test(expected = InvalidReceiveException.class) public void testOversizeRequest() throws IOException { - SaslServerAuthenticator authenticator = setupAuthenticator(); TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); - PrincipalBuilder principalBuilder = null; // SASL authenticator does not currently use principal builder - Map<String, ?> configs = Collections.singletonMap(SaslConfigs.SASL_ENABLED_MECHANISMS, + Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer); final Capture<ByteBuffer> size = EasyMock.newCapture(); EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() { @@ -66,17 +63,15 @@ public class SaslServerAuthenticatorTest { EasyMock.replay(transportLayer); - authenticator.configure(transportLayer, principalBuilder, configs); authenticator.authenticate(); } @Test public void testUnexpectedRequestType() throws IOException { - SaslServerAuthenticator authenticator = setupAuthenticator(); TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); - PrincipalBuilder principalBuilder = null; // SASL authenticator does not currently use principal builder - Map<String, ?> configs = Collections.singletonMap(SaslConfigs.SASL_ENABLED_MECHANISMS, + Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Collections.singletonList(SCRAM_SHA_256.mechanismName())); + SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer); final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243); final Struct headerStruct = header.toStruct(); @@ -102,7 +97,6 @@ public class SaslServerAuthenticatorTest { EasyMock.replay(transportLayer); - authenticator.configure(transportLayer, principalBuilder, configs); try { authenticator.authenticate(); fail("Expected authenticate() to raise an exception"); @@ -111,13 +105,13 @@ public class SaslServerAuthenticatorTest { } } - private SaslServerAuthenticator setupAuthenticator() throws IOException { + private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer) throws IOException { TestJaasConfig jaasConfig = new TestJaasConfig(); jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>()); JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig); Subject subject = new Subject(); - return new SaslServerAuthenticator("node", jaasContext, subject, null, InetAddress.getLocalHost(), - new CredentialCache(), new ListenerName("ssl"), SecurityProtocol.SASL_SSL); + return new SaslServerAuthenticator(configs, "node", jaasContext, subject, null, new CredentialCache(), + new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java new file mode 100644 index 0000000..273c13a --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SecurityUtilsTest { + + @Test + public void testPrincipalNameCanContainSeparator() { + String name = "name:with:separator:in:it"; + KafkaPrincipal principal = SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + name); + assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); + assertEquals(name, principal.getName()); + } + + @Test + public void testParseKafkaPrincipalWithNonUserPrincipalType() { + String name = "foo"; + String principalType = "Group"; + KafkaPrincipal principal = SecurityUtils.parseKafkaPrincipal(principalType + ":" + name); + assertEquals(principalType, principal.getPrincipalType()); + assertEquals(name, principal.getName()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index ffe75a8..de2bcde 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -31,14 +31,11 @@ import kafka.metrics.KafkaMetricsGroup import kafka.security.CredentialProvider import kafka.server.KafkaConfig import kafka.utils._ -import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.Rate import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector} import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.protocol.types.SchemaException import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time} @@ -547,10 +544,9 @@ private[kafka] class Processor(val id: Int, try { openOrClosingChannel(receive.source) match { case Some(channel) => - val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName) val header = RequestHeader.parse(receive.payload) val context = new RequestContext(header, receive.source, channel.socketAddress, - principal, listenerName, securityProtocol) + channel.principal, listenerName, securityProtocol) val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = time.nanoseconds, memoryPool, receive.payload) requestChannel.sendRequest(req) http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index de6559c..33eaf48 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -29,6 +29,7 @@ import kafka.utils.CoreUtils import kafka.utils.Implicits._ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.ConfigDef.ValidList +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig} import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.network.ListenerName @@ -193,7 +194,6 @@ object Defaults { val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString() /** ********* SSL configuration ***********/ - val PrincipalBuilderClass = SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS val SslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE @@ -212,7 +212,7 @@ object Defaults { val SaslKerberosTicketRenewWindowFactor = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR val SaslKerberosTicketRenewJitter = SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER val SaslKerberosMinTimeBeforeRelogin = SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN - val SaslKerberosPrincipalToLocalRules = SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES + val SaslKerberosPrincipalToLocalRules = BrokerSecurityConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES } object KafkaConfig { @@ -375,8 +375,10 @@ object KafkaConfig { val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG val MetricRecordingLevelProp: String = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG + /** ******** Common Security Configuration *************/ + val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG + /** ********* SSL Configuration ****************/ - val PrincipalBuilderClassProp = SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG val SslProtocolProp = SslConfigs.SSL_PROTOCOL_CONFIG val SslProviderProp = SslConfigs.SSL_PROVIDER_CONFIG val SslCipherSuitesProp = SslConfigs.SSL_CIPHER_SUITES_CONFIG @@ -392,17 +394,17 @@ object KafkaConfig { val SslTrustManagerAlgorithmProp = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG val SslSecureRandomImplementationProp = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG - val SslClientAuthProp = SslConfigs.SSL_CLIENT_AUTH_CONFIG + val SslClientAuthProp = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG /** ********* SASL Configuration ****************/ val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol" - val SaslEnabledMechanismsProp = SaslConfigs.SASL_ENABLED_MECHANISMS + val SaslEnabledMechanismsProp = BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD val SaslKerberosTicketRenewWindowFactorProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR val SaslKerberosTicketRenewJitterProp = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER val SaslKerberosMinTimeBeforeReloginProp = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN - val SaslKerberosPrincipalToLocalRulesProp = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES + val SaslKerberosPrincipalToLocalRulesProp = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG /* Documentation */ /** ********* Zookeeper Configuration ***********/ @@ -636,8 +638,10 @@ object KafkaConfig { val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC + /** ******** Common Security Configuration *************/ + val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC + /** ********* SSL Configuration ****************/ - val PrincipalBuilderClassDoc = SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC val SslProtocolDoc = SslConfigs.SSL_PROTOCOL_DOC val SslProviderDoc = SslConfigs.SSL_PROVIDER_DOC val SslCipherSuitesDoc = SslConfigs.SSL_CIPHER_SUITES_DOC @@ -653,7 +657,7 @@ object KafkaConfig { val SslTrustManagerAlgorithmDoc = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC val SslSecureRandomImplementationDoc = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC - val SslClientAuthDoc = SslConfigs.SSL_CLIENT_AUTH_DOC + val SslClientAuthDoc = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC /** ********* Sasl Configuration ****************/ val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI." @@ -663,7 +667,7 @@ object KafkaConfig { val SaslKerberosTicketRenewWindowFactorDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC val SaslKerberosTicketRenewJitterDoc = SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC val SaslKerberosMinTimeBeforeReloginDoc = SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC - val SaslKerberosPrincipalToLocalRulesDoc = SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC + val SaslKerberosPrincipalToLocalRulesDoc = BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC private val configDef = { import ConfigDef.Importance._ @@ -834,7 +838,7 @@ object KafkaConfig { .define(ReplicationQuotaWindowSizeSecondsProp, INT, Defaults.ReplicationQuotaWindowSizeSeconds, atLeast(1), LOW, ReplicationQuotaWindowSizeSecondsDoc) /** ********* SSL Configuration ****************/ - .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc) + .define(PrincipalBuilderClassProp, CLASS, null, MEDIUM, PrincipalBuilderClassDoc) .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc) .define(SslProviderProp, STRING, null, MEDIUM, SslProviderDoc) .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc) http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 3376d23..bbb3249 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -69,7 +69,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas val topicWildcard = "*" val part = 0 val tp = new TopicPartition(topic, part) - val topicAndPartition = new TopicAndPartition(topic, part) + val topicAndPartition = TopicAndPartition(topic, part) val clientPrincipal: String val kafkaPrincipal: String @@ -154,8 +154,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas * Starts MiniKDC and only then sets up the parent trait. */ @Before - override def setUp { - super.setUp + override def setUp() { + super.setUp() AclCommand.main(topicBrokerReadAclArgs) servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*")) @@ -177,9 +177,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas * Closes MiniKDC last when tearing down. */ @After - override def tearDown { + override def tearDown() { consumers.foreach(_.wakeup()) - super.tearDown + super.tearDown() closeSasl() } @@ -240,7 +240,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas */ @Test(expected = classOf[KafkaException]) def testNoConsumeWithoutDescribeAclViaAssign(): Unit = { - noConsumeWithoutDescribeAclSetup + noConsumeWithoutDescribeAclSetup() consumers.head.assign(List(tp).asJava) // the exception is expected when the consumer attempts to lookup offsets consumeRecords(this.consumers.head) @@ -248,7 +248,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @Test(expected = classOf[TimeoutException]) def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = { - noConsumeWithoutDescribeAclSetup + noConsumeWithoutDescribeAclSetup() consumers.head.subscribe(List(topic).asJava) // this should timeout since the consumer will not be able to fetch any metadata for the topic consumeRecords(this.consumers.head, timeout = 3000) @@ -273,7 +273,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @Test def testNoConsumeWithDescribeAclViaAssign(): Unit = { - noConsumeWithDescribeAclSetup + noConsumeWithDescribeAclSetup() consumers.head.assign(List(tp).asJava) try { @@ -287,7 +287,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas @Test def testNoConsumeWithDescribeAclViaSubscribe(): Unit = { - noConsumeWithDescribeAclSetup + noConsumeWithDescribeAclSetup() consumers.head.subscribe(List(topic).asJava) try { http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index b8dc57b..4b27239 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -26,6 +26,7 @@ import java.util.Properties import org.apache.kafka.clients.producer.KafkaProducer import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness +import org.apache.kafka.common.network.ListenerName import org.junit.{After, Before} import scala.collection.mutable.Buffer @@ -46,14 +47,21 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + protected def interBrokerListenerName: ListenerName = listenerName + override def generateConfigs = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) cfgs.foreach { config => - config.setProperty(KafkaConfig.ListenersProp, s"${listenerName.value}://localhost:${TestUtils.RandomPort}") config.remove(KafkaConfig.InterBrokerSecurityProtocolProp) - config.setProperty(KafkaConfig.InterBrokerListenerNameProp, listenerName.value) - config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") + config.setProperty(KafkaConfig.InterBrokerListenerNameProp, interBrokerListenerName.value) + + val listenerNames = Set(listenerName, interBrokerListenerName) + val listeners = listenerNames.map(listenerName => s"${listenerName.value}://localhost:${TestUtils.RandomPort}").mkString(",") + val listenerSecurityMap = listenerNames.map(listenerName => s"${listenerName.value}:${securityProtocol.name}").mkString(",") + + config.setProperty(KafkaConfig.ListenersProp, listeners) + config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityMap) } cfgs.foreach(_ ++= serverConfig) cfgs.map(KafkaConfig.fromProps) http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala new file mode 100644 index 0000000..2aeccb4 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.api + +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, PlaintextAuthenticationContext} +import org.junit.Before + +// This test case uses a separate listener for client and inter-broker communication, from +// which we derive corresponding principals +object PlaintextEndToEndAuthorizationTest { + class TestClientPrincipalBuilder extends KafkaPrincipalBuilder { + override def build(context: AuthenticationContext): KafkaPrincipal = { + context match { + case ctx: PlaintextAuthenticationContext if ctx.clientAddress != null => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client") + case _ => + KafkaPrincipal.ANONYMOUS + } + } + } + + class TestServerPrincipalBuilder extends KafkaPrincipalBuilder { + override def build(context: AuthenticationContext): KafkaPrincipal = { + context match { + case ctx: PlaintextAuthenticationContext => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server") + case _ => + KafkaPrincipal.ANONYMOUS + } + } + } +} + +class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest { + import PlaintextEndToEndAuthorizationTest.{TestClientPrincipalBuilder, TestServerPrincipalBuilder} + + override protected def securityProtocol = SecurityProtocol.PLAINTEXT + override protected def listenerName: ListenerName = new ListenerName("CLIENT") + override protected def interBrokerListenerName: ListenerName = new ListenerName("SERVER") + + this.serverConfig.setProperty("listener.name.client." + BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, + classOf[TestClientPrincipalBuilder].getName) + this.serverConfig.setProperty("listener.name.server." + BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, + classOf[TestServerPrincipalBuilder].getName) + override val clientPrincipal = "client" + override val kafkaPrincipal = "server" + + @Before + override def setUp() { + startSasl(jaasSections(List.empty, None, ZkSasl)) + super.setUp() + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala index 975ca4c..c28de3e 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala @@ -17,13 +17,38 @@ package kafka.api import kafka.utils.{JaasTestUtils, TestUtils} +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SaslAuthenticationContext} import org.junit.Test +object SaslPlainSslEndToEndAuthorizationTest { + class TestPrincipalBuilder extends KafkaPrincipalBuilder { + + override def build(context: AuthenticationContext): KafkaPrincipal = { + context match { + case ctx: SaslAuthenticationContext => + ctx.server.getAuthorizationID match { + case JaasTestUtils.KafkaPlainAdmin => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin") + case JaasTestUtils.KafkaPlainUser => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user") + case _ => + KafkaPrincipal.ANONYMOUS + } + } + } + } +} + class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { + import SaslPlainSslEndToEndAuthorizationTest.TestPrincipalBuilder + + this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[TestPrincipalBuilder].getName) + override protected def kafkaClientSaslMechanism = "PLAIN" override protected def kafkaServerSaslMechanisms = List("PLAIN") - override val clientPrincipal = JaasTestUtils.KafkaPlainUser - override val kafkaPrincipal = JaasTestUtils.KafkaPlainAdmin + override val clientPrincipal = "user" + override val kafkaPrincipal = "admin" /** * Checks that secure paths created by broker and acl paths created by AclCommand http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 1128ad0..f874f4e 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -28,6 +28,7 @@ import kafka.utils.{JaasTestUtils, TestUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs /* * Implements an enumeration for the modes enabled here: @@ -119,7 +120,7 @@ trait SaslSetup { def kafkaServerSaslProperties(serverSaslMechanisms: Seq[String], interBrokerSaslMechanism: String): Properties = { val props = new Properties props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, interBrokerSaslMechanism) - props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverSaslMechanisms.mkString(",")) + props.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, serverSaslMechanisms.mkString(",")) props } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index caa988d..8d3f9b2 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -17,19 +17,42 @@ package kafka.api -import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SslAuthenticationContext} import org.junit.Before +object SslEndToEndAuthorizationTest { + class TestPrincipalBuilder extends KafkaPrincipalBuilder { + private val Pattern = "O=A (.*?),CN=localhost".r + + override def build(context: AuthenticationContext): KafkaPrincipal = { + context match { + case ctx: SslAuthenticationContext => + ctx.session.getPeerPrincipal.getName match { + case Pattern(name) => + new KafkaPrincipal(KafkaPrincipal.USER_TYPE, name) + case _ => + KafkaPrincipal.ANONYMOUS + } + } + } + } +} + class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { + import kafka.api.SslEndToEndAuthorizationTest.TestPrincipalBuilder + override protected def securityProtocol = SecurityProtocol.SSL - this.serverConfig.setProperty(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required") - override val clientPrincipal = "O=A client,CN=localhost" - override val kafkaPrincipal = "O=A server,CN=localhost" + this.serverConfig.setProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, "required") + this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[TestPrincipalBuilder].getName) + override val clientPrincipal = "client" + override val kafkaPrincipal = "server" @Before - override def setUp { + override def setUp() { startSasl(jaasSections(List.empty, None, ZkSasl)) - super.setUp + super.setUp() } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index c005c72..1b5841d 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -80,7 +80,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { @Before override def setUp() { - super.setUp + super.setUp() if (configs.isEmpty) throw new KafkaException("Must supply at least one server config.") http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 5dbd1a8..ca17b9a 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -34,7 +34,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { val simpleAclAuthorizer2 = new SimpleAclAuthorizer val testPrincipal = Acl.WildCardPrincipal val testHostName = InetAddress.getByName("192.168.0.1") - val session = new Session(testPrincipal, testHostName) + val session = Session(testPrincipal, testHostName) var resource: Resource = null val superUsers = "User:superuser1; User:superuser2" val username = "alice" @@ -95,8 +95,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl]) - val host1Session = new Session(user1, host1) - val host2Session = new Session(user1, host2) + val host1Session = Session(user1, host1) + val host2Session = Session(user1, host2) assertTrue("User1 should have READ access from host2", simpleAclAuthorizer.authorize(host2Session, Read, resource)) assertFalse("User1 should not have READ access from host1 due to denyAcl", simpleAclAuthorizer.authorize(host1Session, Read, resource)) @@ -108,8 +108,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { assertFalse("User1 should not have edit access from host2", simpleAclAuthorizer.authorize(host2Session, Alter, resource)) //test if user has READ and write access they also get describe access - val user2Session = new Session(user2, host1) - val user3Session = new Session(user3, host1) + val user2Session = Session(user2, host1) + val user3Session = Session(user3, host1) assertTrue("User2 should have DESCRIBE access from host1", simpleAclAuthorizer.authorize(user2Session, Describe, resource)) assertTrue("User3 should have DESCRIBE access from host2", simpleAclAuthorizer.authorize(user3Session, Describe, resource)) assertTrue("User2 should have READ access from host1", simpleAclAuthorizer.authorize(user2Session, Read, resource)) @@ -120,7 +120,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { def testDenyTakesPrecedence() { val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val host = InetAddress.getByName("192.168.2.1") - val session = new Session(user, host) + val session = Session(user, host) val allowAll = Acl.AllowAllAcl val denyAcl = new Acl(user, Deny, host.getHostAddress, All) @@ -137,7 +137,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl]) - val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4")) + val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4")) assertTrue("allow all acl should allow access to all.", simpleAclAuthorizer.authorize(session, Read, resource)) } @@ -147,8 +147,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl]) - val session1 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4")) - val session2 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4")) + val session1 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4")) + val session2 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4")) assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session1, Read, resource)) assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2, Read, resource)) @@ -165,7 +165,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource) - val host1Session = new Session(user1, host1) + val host1Session = Session(user1, host1) assertTrue("User1 should have Read access from host1", simpleAclAuthorizer.authorize(host1Session, Read, resource)) //allow Write to specific topic. http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index aec68e2..3ec03c3 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -28,20 +28,19 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} -import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLayer} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} -import org.apache.kafka.common.security.auth.{DefaultPrincipalBuilder, KafkaPrincipal} +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} import org.junit.Assert._ import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer - class RequestQuotaTest extends BaseRequestTest { override def numBrokers: Int = 1 @@ -85,7 +84,7 @@ class RequestQuotaTest extends BaseRequestTest { AdminUtils.changeClientIdConfig(zkUtils, unthrottledClientId, quotaProps) TestUtils.retry(10000) { - val quotaManager = servers(0).apis.quotas.request + val quotaManager = servers.head.apis.quotas.request assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client")) assertEquals(s"Request quota override not set", Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId)) } @@ -445,8 +444,8 @@ object RequestQuotaTest { session.principal != UnauthorizedPrincipal } } - class TestPrincipalBuilder extends DefaultPrincipalBuilder { - override def buildPrincipal(transportLayer: TransportLayer, authenticator: Authenticator) = { + class TestPrincipalBuilder extends KafkaPrincipalBuilder { + override def build(context: AuthenticationContext): KafkaPrincipal = { principal } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b5d88fe/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 9517789..cd31f2c 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -158,9 +158,9 @@ object JaasTestUtils { kafkaClientModule(mechanism, keytabLocation, KafkaClientPrincipal, KafkaPlainUser, KafkaPlainPassword, KafkaScramUser, KafkaScramPassword).toString def zkSections: Seq[JaasSection] = Seq( - new JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false, + JaasSection(ZkServerContextName, Seq(ZkDigestModule(debug = false, Map("user_super" -> ZkUserSuperPasswd, s"user_$ZkUser" -> ZkUserPassword)))), - new JaasSection(ZkClientContextName, Seq(ZkDigestModule(debug = false, + JaasSection(ZkClientContextName, Seq(ZkDigestModule(debug = false, Map("username" -> ZkUser, "password" -> ZkUserPassword)))) ) @@ -191,7 +191,7 @@ object JaasTestUtils { debug = false) case mechanism => throw new IllegalArgumentException("Unsupported server mechanism " + mechanism) } - new JaasSection(contextName, modules) + JaasSection(contextName, modules) } // consider refactoring if more mechanisms are added @@ -227,7 +227,7 @@ object JaasTestUtils { * Used for the static JAAS configuration and it uses the credentials for client#2 */ def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = { - new JaasSection(KafkaClientContextName, mechanism.map(m => + JaasSection(KafkaClientContextName, mechanism.map(m => kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2)).toSeq) }