This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 71f281a  Add basic authentication plugin (#1087)
71f281a is described below

commit 71f281ab34e4e5f9c22ea739f327d7620d0c49c7
Author: yush1ga <y.shiga.91+yush...@gmail.com>
AuthorDate: Sat Feb 3 05:03:49 2018 +0900

    Add basic authentication plugin (#1087)
    
    * Add password authentication plugin
    
    * Use .htpasswd
    
    * Simplify command data
    
    * Add PulsarHttpAuthenticationException to get realm information
    
    * Addressed PR comments
    
    * Fix tests
    
    * Deleted PulsarHttpAuthenticationException
    
    * Added .htpasswd to license exclude list
---
 pom.xml                                            |   1 +
 .../AuthenticationProviderBasic.java               | 142 +++++++++++++++++++++
 .../api/AuthenticatedProducerConsumerTest.java     |  86 +++++++++----
 .../test/resources/authentication/basic/.htpasswd  |   2 +
 pulsar-client/pom.xml                              |   6 +
 .../client/impl/auth/AuthenticationBasic.java      |  72 +++++++++++
 .../client/impl/auth/AuthenticationDataBasic.java  |  61 +++++++++
 7 files changed, 343 insertions(+), 27 deletions(-)

diff --git a/pom.xml b/pom.xml
index 9eb1524..cbc7406 100644
--- a/pom.xml
+++ b/pom.xml
@@ -648,6 +648,7 @@ flexible messaging model and an intuitive client 
API.</description>
             <exclude>logs/**</exclude>
             <exclude>**/*.versionsBackup</exclude>
             <exclude>**/circe/**</exclude>
+            
<exclude>pulsar-broker/src/test/resources/authentication/basic/.htpasswd</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/gf2.hpp</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/crc32c_sse42.cc</exclude>
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
new file mode 100644
index 0000000..f2f2f65
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.authentication;
+
+import org.apache.commons.codec.digest.Crypt;
+import org.apache.commons.codec.digest.Md5Crypt;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+
+import javax.naming.AuthenticationException;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class AuthenticationProviderBasic implements AuthenticationProvider {
+    private final static String HTTP_HEADER_NAME = "Authorization";
+    private final static String CONF_SYSTEM_PROPERTY_KEY = 
"pulsar.auth.basic.conf";
+    private Map<String, String> users;
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
+
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {
+        File confFile = new File(System.getProperty(CONF_SYSTEM_PROPERTY_KEY));
+        if (!confFile.exists()) {
+            throw new IOException("The password auth conf file does not 
exist");
+        } else if (!confFile.isFile()) {
+            throw new IOException("The path is not a file");
+        }
+        BufferedReader reader = new BufferedReader(new FileReader(confFile));
+        users = new HashMap<>();
+        for (String line : reader.lines().toArray(s -> new String[s])) {
+            List<String> splitLine = Arrays.asList(line.split(":"));
+            if (splitLine.size() != 2) {
+                throw new IOException("The format of the password auth conf 
file is invalid");
+            }
+            users.put(splitLine.get(0), splitLine.get(1));
+        }
+        reader.close();
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return "basic";
+    }
+
+    @Override
+    public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
+        AuthParams authParams = new AuthParams(authData);
+        String userId = authParams.getUserId();
+        String password = authParams.getPassword();
+        String msg = "Unknown user or invalid password";
+
+        if (users.get(userId) == null) {
+            throw new AuthenticationException(msg);
+        }
+
+        String encryptedPassword = users.get(userId);
+
+        // For md5 algorithm
+        if ((users.get(userId).startsWith("$apr1"))) {
+            List<String> splitEncryptedPassword = 
Arrays.asList(encryptedPassword.split("\\$"));
+            if (splitEncryptedPassword.size() != 4 || !encryptedPassword
+                    .equals(Md5Crypt.apr1Crypt(password.getBytes(), 
splitEncryptedPassword.get(2)))) {
+                throw new AuthenticationException(msg);
+            }
+        // For crypt algorithm
+        } else if (!encryptedPassword.equals(Crypt.crypt(password.getBytes(), 
encryptedPassword.substring(0, 2)))) {
+            throw new AuthenticationException(msg);
+        }
+
+        return userId;
+    }
+
+    private class AuthParams {
+        private String userId;
+        private String password;
+
+        public AuthParams(AuthenticationDataSource authData) throws 
AuthenticationException {
+            String authParams;
+            if (authData.hasDataFromCommand()) {
+                authParams = authData.getCommandData();
+            } else if (authData.hasDataFromHttp()) {
+                String rawAuthToken = authData.getHttpHeader(HTTP_HEADER_NAME);
+                // parsing and validation
+                if (StringUtils.isBlank(rawAuthToken) || 
!rawAuthToken.toUpperCase().startsWith("BASIC ")) {
+                    throw new AuthenticationException("Authentication token 
has to be started with \"Basic \"");
+                }
+                String[] splitRawAuthToken = rawAuthToken.split(" ");
+                if (splitRawAuthToken.length != 2) {
+                    throw new AuthenticationException("Base64 encoded token is 
not found");
+                }
+
+                try {
+                    authParams = new 
String(Base64.getDecoder().decode(splitRawAuthToken[1]));
+                } catch (Exception e) {
+                    throw new AuthenticationException("Base64 decoding is 
failure: " + e.getMessage());
+                }
+            } else {
+                throw new AuthenticationException("Authentication data source 
does not have data");
+            }
+
+            String[] parsedAuthParams = authParams.split(":");
+            if (parsedAuthParams.length != 2) {
+                throw new AuthenticationException("Base64 decoded params are 
invalid");
+            }
+
+            userId = parsedAuthParams[0];
+            password = parsedAuthParams[1];
+        }
+
+        public String getUserId() {
+            return userId;
+        }
+
+        public String getPassword() {
+            return password;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 83b15f5..7292940 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -18,32 +18,17 @@
  */
 package org.apache.pulsar.client.api;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.InternalServerErrorException;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
-import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -52,8 +37,13 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import javax.ws.rs.InternalServerErrorException;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
 
 public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);
@@ -64,6 +54,8 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
     private final String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/authentication/tls/client-cert.pem";
     private final String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/authentication/tls/client-key.pem";
 
+    private final String BASIC_CONF_FILE_PATH = 
"./src/test/resources/authentication/basic/.htpasswd";
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -83,6 +75,7 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         Set<String> superUserRoles = new HashSet<>();
         superUserRoles.add("localhost");
         superUserRoles.add("superUser");
+        superUserRoles.add("superUser2");
         conf.setSuperUserRoles(superUserRoles);
 
         
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
@@ -91,6 +84,8 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
+        providers.add(AuthenticationProviderBasic.class.getName());
+        System.setProperty("pulsar.auth.basic.conf", BASIC_CONF_FILE_PATH);
         conf.setAuthenticationProviders(providers);
 
         conf.setClusterName("use");
@@ -107,7 +102,13 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         clientConf.setUseTls(true);
 
         admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
-        String lookupUrl = new URI("pulsar+ssl://localhost:" + 
BROKER_PORT_TLS).toString();
+        String lookupUrl;
+        // For http basic authentication test
+        if (methodName.equals("testBasicCryptSyncProducerAndConsumer")) {
+            lookupUrl = new URI("https://localhost:"; + 
BROKER_WEBSERVICE_PORT_TLS).toString();
+        } else {
+            lookupUrl = new URI("pulsar+ssl://localhost:" + 
BROKER_PORT_TLS).toString();
+        }
         pulsarClient = PulsarClient.create(lookupUrl, clientConf);
     }
 
@@ -167,8 +168,6 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         authTls.configure(authParams);
         internalSetup(authTls);
 
-        admin.clusters().updateCluster("use", new 
ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
-                "pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" 
+ BROKER_PORT_TLS));
         admin.properties().createProperty("my-property",
                 new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), 
Sets.newHashSet("use")));
         admin.namespaces().createNamespace("my-property/use/my-ns");
@@ -179,6 +178,39 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
     }
 
     @Test(dataProvider = "batch")
+    public void testBasicCryptSyncProducerAndConsumer(int batchMessageDelayMs) 
throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        AuthenticationBasic authPassword = new AuthenticationBasic();
+        
authPassword.configure("{\"userId\":\"superUser\",\"password\":\"supepass\"}");
+        internalSetup(authPassword);
+
+        admin.properties()
+                .createProperty("my-property", new 
PropertyAdmin(Lists.newArrayList(), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        testSyncProducerAndConsumer(batchMessageDelayMs);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    @Test(dataProvider = "batch")
+    public void testBasicArp1SyncProducerAndConsumer(int batchMessageDelayMs) 
throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        AuthenticationBasic authPassword = new AuthenticationBasic();
+        
authPassword.configure("{\"userId\":\"superUser2\",\"password\":\"superpassword\"}");
+        internalSetup(authPassword);
+
+        admin.properties()
+                .createProperty("my-property", new 
PropertyAdmin(Lists.newArrayList(), Sets.newHashSet("use")));
+        admin.namespaces().createNamespace("my-property/use/my-ns");
+
+        testSyncProducerAndConsumer(batchMessageDelayMs);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+
+    @Test(dataProvider = "batch")
     public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) 
throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -223,7 +255,7 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
 
     /**
      * Verifies: on 500 server error, broker invalidates session and client 
receives 500 correctly.
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -254,7 +286,7 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
     /**
      * verifies that topicLookup/PartitionMetadataLookup gives 
InternalServerError(500) instead 401(auth_failed) on
      * unknown-exception failure
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -292,4 +324,4 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
 
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/test/resources/authentication/basic/.htpasswd 
b/pulsar-broker/src/test/resources/authentication/basic/.htpasswd
new file mode 100644
index 0000000..b1a099a
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/basic/.htpasswd
@@ -0,0 +1,2 @@
+superUser:mQQQIsyvvKRtU
+superUser2:$apr1$foobarmq$kuSZlLgOITksCkRgl57ie/
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 1f0f441..3c26a9e 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -69,6 +69,12 @@
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcpkix-jdk15on</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
new file mode 100644
index 0000000..37454d2
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationBasic.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class AuthenticationBasic implements Authentication, 
EncodedAuthenticationParameterSupport {
+    private String userId;
+    private String password;
+
+    @Override
+    public void close() throws IOException {
+        // noop
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return "basic";
+    }
+
+    @Override
+    public AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
+        try {
+            return new AuthenticationDataBasic(userId, password);
+        } catch (Exception e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public void configure(Map<String, String> authParams) {
+        configure(new Gson().toJson(authParams));
+    }
+
+    @Override
+    public void configure(String encodedAuthParamString) {
+        JsonObject params = new Gson().fromJson(encodedAuthParamString, 
JsonObject.class);
+        userId = params.get("userId").getAsString();
+        password = params.get("password").getAsString();
+    }
+
+    @Override
+    public void start() throws PulsarClientException {
+        // noop
+    }
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java
new file mode 100644
index 0000000..39b7d5f
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationDataBasic.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl.auth;
+
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.Base64;
+
+public class AuthenticationDataBasic implements AuthenticationDataProvider {
+    private final static String HTTP_HEADER_NAME = "Authorization";
+    private String httpAuthToken;
+    private String commandAuthToken;
+
+    public AuthenticationDataBasic(String userId, String password) {
+        httpAuthToken = "Basic " + Base64.getEncoder().encodeToString((userId 
+ ":" + password).getBytes());
+        commandAuthToken = userId+":"+password;
+    }
+
+    @Override
+    public boolean hasDataForHttp() {
+        return true;
+    }
+
+    @Override
+    public Set<Map.Entry<String, String>> getHttpHeaders() {
+        Map<String, String> headers = new HashMap<>();
+        headers.put(HTTP_HEADER_NAME, httpAuthToken);
+        return headers.entrySet();
+    }
+
+    @Override
+    public boolean hasDataFromCommand() {
+        return true;
+    }
+
+    @Override
+    public String getCommandData() {
+        return commandAuthToken;
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to