This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 71f281a Add basic authentication plugin (#1087) 71f281a is described below commit 71f281ab34e4e5f9c22ea739f327d7620d0c49c7 Author: yush1ga <y.shiga.91+yush...@gmail.com> AuthorDate: Sat Feb 3 05:03:49 2018 +0900 Add basic authentication plugin (#1087) * Add password authentication plugin * Use .htpasswd * Simplify command data * Add PulsarHttpAuthenticationException to get realm information * Addressed PR comments * Fix tests * Deleted PulsarHttpAuthenticationException * Added .htpasswd to license exclude list --- pom.xml | 1 + .../AuthenticationProviderBasic.java | 142 +++++++++++++++++++++ .../api/AuthenticatedProducerConsumerTest.java | 86 +++++++++---- .../test/resources/authentication/basic/.htpasswd | 2 + pulsar-client/pom.xml | 6 + .../client/impl/auth/AuthenticationBasic.java | 72 +++++++++++ .../client/impl/auth/AuthenticationDataBasic.java | 61 +++++++++ 7 files changed, 343 insertions(+), 27 deletions(-) diff --git a/pom.xml b/pom.xml index 9eb1524..cbc7406 100644 --- a/pom.xml +++ b/pom.xml @@ -648,6 +648,7 @@ flexible messaging model and an intuitive client API.</description> <exclude>logs/**</exclude> <exclude>**/*.versionsBackup</exclude> <exclude>**/circe/**</exclude> + <exclude>pulsar-broker/src/test/resources/authentication/basic/.htpasswd</exclude> <exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude> <exclude>pulsar-client-cpp/lib/checksum/gf2.hpp</exclude> <exclude>pulsar-client-cpp/lib/checksum/crc32c_sse42.cc</exclude> diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java new file mode 100644 index 0000000..f2f2f65 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.authentication; + +import org.apache.commons.codec.digest.Crypt; +import org.apache.commons.codec.digest.Md5Crypt; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.ServiceConfiguration; + +import javax.naming.AuthenticationException; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class AuthenticationProviderBasic implements AuthenticationProvider { + private final static String HTTP_HEADER_NAME = "Authorization"; + private final static String CONF_SYSTEM_PROPERTY_KEY = "pulsar.auth.basic.conf"; + private Map<String, String> users; + + @Override + public void close() throws IOException { + // noop + } + + @Override + public void initialize(ServiceConfiguration config) throws IOException { + File confFile = new File(System.getProperty(CONF_SYSTEM_PROPERTY_KEY)); + if (!confFile.exists()) { + throw new IOException("The password auth conf file does not exist"); + } else if (!confFile.isFile()) { + throw new IOException("The path is not a file"); + } + BufferedReader reader = new BufferedReader(new FileReader(confFile)); + users = new HashMap<>(); + for (String line : reader.lines().toArray(s -> new String[s])) { + List<String> splitLine = Arrays.asList(line.split(":")); + if (splitLine.size() != 2) { + throw new IOException("The format of the password auth conf file is invalid"); + } + users.put(splitLine.get(0), splitLine.get(1)); + } + reader.close(); + } + + @Override + public String getAuthMethodName() { + return "basic"; + } + + @Override + public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { + AuthParams authParams = new AuthParams(authData); + String userId = authParams.getUserId(); + String password = authParams.getPassword(); + String msg = "Unknown user or invalid password"; + + if (users.get(userId) == null) { + throw new AuthenticationException(msg); + } + + String encryptedPassword = users.get(userId); + + // For md5 algorithm + if ((users.get(userId).startsWith("$apr1"))) { + List<String> splitEncryptedPassword = Arrays.asList(encryptedPassword.split("\\$")); + if (splitEncryptedPassword.size() != 4 || !encryptedPassword + .equals(Md5Crypt.apr1Crypt(password.getBytes(), splitEncryptedPassword.get(2)))) { + throw new AuthenticationException(msg); + } + // For crypt algorithm + } else if (!encryptedPassword.equals(Crypt.crypt(password.getBytes(), encryptedPassword.substring(0, 2)))) { + throw new AuthenticationException(msg); + } + + return userId; + } + + private class AuthParams { + private String userId; + private String password; + + public AuthParams(AuthenticationDataSource authData) throws AuthenticationException { + String authParams; + if (authData.hasDataFromCommand()) { + authParams = authData.getCommandData(); + } else if (authData.hasDataFromHttp()) { + String rawAuthToken = authData.getHttpHeader(HTTP_HEADER_NAME); + // parsing and validation + if (StringUtils.isBlank(rawAuthToken) || !rawAuthToken.toUpperCase().startsWith("BASIC ")) { + throw new AuthenticationException("Authentication token has to be started with \"Basic \""); + } + String[] splitRawAuthToken = rawAuthToken.split(" "); + if (splitRawAuthToken.length != 2) { + throw new AuthenticationException("Base64 encoded token is not found"); + } + + try { + authParams = new String(Base64.getDecoder().decode(splitRawAuthToken[1])); + } catch (Exception e) { + throw new AuthenticationException("Base64 decoding is failure: " + e.getMessage()); + } + } else { + throw new AuthenticationException("Authentication data source does not have data"); + } + + String[] parsedAuthParams = authParams.split(":"); + if (parsedAuthParams.length != 2) { + throw new AuthenticationException("Base64 decoded params are invalid"); + } + + userId = parsedAuthParams[0]; + password = parsedAuthParams[1]; + } + + public String getUserId() { + return userId; + } + + public String getPassword() { + return password; + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 83b15f5..7292940 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -18,32 +18,17 @@ */ package org.apache.pulsar.client.api; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; - -import java.net.URI; -import java.util.*; -import java.util.concurrent.TimeUnit; - -import javax.ws.rs.InternalServerErrorException; - +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.auth.AuthenticationBasic; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; -import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -52,8 +37,13 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import javax.ws.rs.InternalServerErrorException; +import java.net.URI; +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class); @@ -64,6 +54,8 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem"; private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem"; + private final String BASIC_CONF_FILE_PATH = "./src/test/resources/authentication/basic/.htpasswd"; + @BeforeMethod @Override protected void setup() throws Exception { @@ -83,6 +75,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { Set<String> superUserRoles = new HashSet<>(); superUserRoles.add("localhost"); superUserRoles.add("superUser"); + superUserRoles.add("superUser2"); conf.setSuperUserRoles(superUserRoles); conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName()); @@ -91,6 +84,8 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { Set<String> providers = new HashSet<>(); providers.add(AuthenticationProviderTls.class.getName()); + providers.add(AuthenticationProviderBasic.class.getName()); + System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH); conf.setAuthenticationProviders(providers); conf.setClusterName("use"); @@ -107,7 +102,13 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { clientConf.setUseTls(true); admin = spy(new PulsarAdmin(brokerUrlTls, clientConf)); - String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString(); + String lookupUrl; + // For http basic authentication test + if (methodName.equals("testBasicCryptSyncProducerAndConsumer")) { + lookupUrl = new URI("https://localhost:" + BROKER_WEBSERVICE_PORT_TLS).toString(); + } else { + lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString(); + } pulsarClient = PulsarClient.create(lookupUrl, clientConf); } @@ -167,8 +168,6 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { authTls.configure(authParams); internalSetup(authTls); - admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), - "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS)); admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); admin.namespaces().createNamespace("my-property/use/my-ns"); @@ -179,6 +178,39 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { } @Test(dataProvider = "batch") + public void testBasicCryptSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception { + log.info("-- Starting {} test --", methodName); + AuthenticationBasic authPassword = new AuthenticationBasic(); + authPassword.configure("{\"userId\":\"superUser\",\"password\":\"supepass\"}"); + internalSetup(authPassword); + + admin.properties() + .createProperty("my-property", new PropertyAdmin(Lists.newArrayList(), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-property/use/my-ns"); + + testSyncProducerAndConsumer(batchMessageDelayMs); + + log.info("-- Exiting {} test --", methodName); + } + + @Test(dataProvider = "batch") + public void testBasicArp1SyncProducerAndConsumer(int batchMessageDelayMs) throws Exception { + log.info("-- Starting {} test --", methodName); + AuthenticationBasic authPassword = new AuthenticationBasic(); + authPassword.configure("{\"userId\":\"superUser2\",\"password\":\"superpassword\"}"); + internalSetup(authPassword); + + admin.properties() + .createProperty("my-property", new PropertyAdmin(Lists.newArrayList(), Sets.newHashSet("use"))); + admin.namespaces().createNamespace("my-property/use/my-ns"); + + testSyncProducerAndConsumer(batchMessageDelayMs); + + log.info("-- Exiting {} test --", methodName); + } + + + @Test(dataProvider = "batch") public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws Exception { log.info("-- Starting {} test --", methodName); @@ -223,7 +255,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { /** * Verifies: on 500 server error, broker invalidates session and client receives 500 correctly. - * + * * @throws Exception */ @Test @@ -254,7 +286,7 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { /** * verifies that topicLookup/PartitionMetadataLookup gives InternalServerError(500) instead 401(auth_failed) on * unknown-exception failure - * + * * @throws Exception */ @Test @@ -292,4 +324,4 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/test/resources/authentication/basic/.htpasswd b/pulsar-broker/src/test/resources/authentication/basic/.htpasswd new file mode 100644 index 0000000..b1a099a --- /dev/null +++ b/pulsar-broker/src/test/resources/authentication/basic/.htpasswd @@ -0,0 +1,2 @@ +superUser:mQQQIsyvvKRtU +superUser2:$apr1$foobarmq$kuSZlLgOITksCkRgl57ie/ diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index 1f0f441..3c26a9e 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -69,6 +69,12 @@ <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> </dependency> + + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + </dependencies> <build> diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java new file mode 100644 index 0000000..37454d2 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.auth; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; +import org.apache.pulsar.client.api.PulsarClientException; + +import java.io.IOException; +import java.util.Map; + +public class AuthenticationBasic implements Authentication, EncodedAuthenticationParameterSupport { + private String userId; + private String password; + + @Override + public void close() throws IOException { + // noop + } + + @Override + public String getAuthMethodName() { + return "basic"; + } + + @Override + public AuthenticationDataProvider getAuthData() throws PulsarClientException { + try { + return new AuthenticationDataBasic(userId, password); + } catch (Exception e) { + throw new PulsarClientException(e); + } + } + + @Override + public void configure(Map<String, String> authParams) { + configure(new Gson().toJson(authParams)); + } + + @Override + public void configure(String encodedAuthParamString) { + JsonObject params = new Gson().fromJson(encodedAuthParamString, JsonObject.class); + userId = params.get("userId").getAsString(); + password = params.get("password").getAsString(); + } + + @Override + public void start() throws PulsarClientException { + // noop + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java new file mode 100644 index 0000000..39b7d5f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl.auth; + +import org.apache.pulsar.client.api.AuthenticationDataProvider; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import java.util.Base64; + +public class AuthenticationDataBasic implements AuthenticationDataProvider { + private final static String HTTP_HEADER_NAME = "Authorization"; + private String httpAuthToken; + private String commandAuthToken; + + public AuthenticationDataBasic(String userId, String password) { + httpAuthToken = "Basic " + Base64.getEncoder().encodeToString((userId + ":" + password).getBytes()); + commandAuthToken = userId+":"+password; + } + + @Override + public boolean hasDataForHttp() { + return true; + } + + @Override + public Set<Map.Entry<String, String>> getHttpHeaders() { + Map<String, String> headers = new HashMap<>(); + headers.put(HTTP_HEADER_NAME, httpAuthToken); + return headers.entrySet(); + } + + @Override + public boolean hasDataFromCommand() { + return true; + } + + @Override + public String getCommandData() { + return commandAuthToken; + } +} -- To stop receiving notification emails like this one, please contact mme...@apache.org.