This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a1fe1299a1b22fb21d3852c1e575f8b80ae7d086 Author: Zike Yang <[email protected]> AuthorDate: Tue Jan 26 09:15:11 2021 +0800 [Authentication]Add authentication metrics (#9244) ### Motivation Currently, if many clients are failing to authenticate, it might not be clear as to the reason why. For JWT auth for example, it could be because the JWT failed to validate, or that a token is expired. In a busy system, these messages are too noisy to log, so we should instead consider metrics that provide info on successful vs failed auths, as well as auth failure reasons ### Modifications * Implement `AuthenticationMetrics` API that allows for instrumenting auth success, and auth failures, with a Prometheus compatible attribute used to distinguish failure reasons. * Add auth metrics instrument to the `basic auth`, `jwt auth`, `tls auth`, `athenz auth` and `sasl auth`. (cherry picked from commit f89dbb6d7604051945359034f9a3e922c78f62da) --- .../AuthenticationProviderAthenz.java | 87 ++++++++++++---------- .../authentication/AuthenticationProviderSasl.java | 16 +++- .../AuthenticationProviderBasic.java | 32 ++++---- .../authentication/AuthenticationProviderTls.java | 81 ++++++++++---------- .../AuthenticationProviderToken.java | 10 ++- .../metrics/AuthenticationMetrics.java | 53 +++++++++++++ .../pulsar/broker/stats/PrometheusMetricsTest.java | 79 +++++++++++++++++++- 7 files changed, 260 insertions(+), 98 deletions(-) diff --git a/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java b/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java index 955a96d..7a164b0 100644 --- a/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java +++ b/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java @@ -26,8 +26,7 @@ import java.security.PublicKey; import javax.naming.AuthenticationException; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.broker.authentication.AuthenticationProvider; +import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,53 +84,59 @@ public class AuthenticationProviderAthenz implements AuthenticationProvider { public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { SocketAddress clientAddress; String roleToken; + try { - if (authData.hasDataFromPeer()) { - clientAddress = authData.getPeerAddress(); - } else { - throw new AuthenticationException("Authentication data source does not have a client address"); - } - - if (authData.hasDataFromCommand()) { - roleToken = authData.getCommandData(); - } else if (authData.hasDataFromHttp()) { - roleToken = authData.getHttpHeader(AuthZpeClient.ZPE_TOKEN_HDR); - } else { - throw new AuthenticationException("Authentication data source does not have a role token"); - } - - if (roleToken == null) { - throw new AuthenticationException("Athenz token is null, can't authenticate"); - } - if (roleToken.isEmpty()) { - throw new AuthenticationException("Athenz RoleToken is empty, Server is Using Athenz Authentication"); - } - if (log.isDebugEnabled()) { - log.debug("Athenz RoleToken : [{}] received from Client: {}", roleToken, clientAddress); - } + if (authData.hasDataFromPeer()) { + clientAddress = authData.getPeerAddress(); + } else { + throw new AuthenticationException("Authentication data source does not have a client address"); + } - RoleToken token = new RoleToken(roleToken); + if (authData.hasDataFromCommand()) { + roleToken = authData.getCommandData(); + } else if (authData.hasDataFromHttp()) { + roleToken = authData.getHttpHeader(AuthZpeClient.ZPE_TOKEN_HDR); + } else { + throw new AuthenticationException("Authentication data source does not have a role token"); + } - if (!domainNameList.contains(token.getDomain())) { - throw new AuthenticationException( - String.format("Athenz RoleToken Domain mismatch, Expected: %s, Found: %s", domainNameList.toString(), token.getDomain())); - } + if (roleToken == null) { + throw new AuthenticationException("Athenz token is null, can't authenticate"); + } + if (roleToken.isEmpty()) { + throw new AuthenticationException("Athenz RoleToken is empty, Server is Using Athenz Authentication"); + } + if (log.isDebugEnabled()) { + log.debug("Athenz RoleToken : [{}] received from Client: {}", roleToken, clientAddress); + } - // Synchronize for non-thread safe static calls inside athenz library - synchronized (this) { - PublicKey ztsPublicKey = AuthZpeClient.getZtsPublicKey(token.getKeyId()); + RoleToken token = new RoleToken(roleToken); - if (ztsPublicKey == null) { - throw new AuthenticationException("Unable to retrieve ZTS Public Key"); + if (!domainNameList.contains(token.getDomain())) { + throw new AuthenticationException( + String.format("Athenz RoleToken Domain mismatch, Expected: %s, Found: %s", domainNameList.toString(), token.getDomain())); } - if (token.validate(ztsPublicKey, allowedOffset, false, null)) { - log.debug("Athenz Role Token : {}, Authenticated for Client: {}", roleToken, clientAddress); - return token.getPrincipal(); - } else { - throw new AuthenticationException( - String.format("Athenz Role Token Not Authenticated from Client: %s", clientAddress)); + // Synchronize for non-thread safe static calls inside athenz library + synchronized (this) { + PublicKey ztsPublicKey = AuthZpeClient.getZtsPublicKey(token.getKeyId()); + + if (ztsPublicKey == null) { + throw new AuthenticationException("Unable to retrieve ZTS Public Key"); + } + + if (token.validate(ztsPublicKey, allowedOffset, false, null)) { + log.debug("Athenz Role Token : {}, Authenticated for Client: {}", roleToken, clientAddress); + AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); + return token.getPrincipal(); + } else { + throw new AuthenticationException( + String.format("Athenz Role Token Not Authenticated from Client: %s", clientAddress)); + } } + } catch (AuthenticationException exception) { + AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage()); + throw exception; } } diff --git a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java index 32ab71c..1791ea2 100644 --- a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java +++ b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java @@ -53,6 +53,7 @@ import com.google.common.collect.Maps; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.sasl.JAASCredentialsContainer; import org.apache.pulsar.common.sasl.SaslConstants; @@ -100,11 +101,18 @@ public class AuthenticationProviderSasl implements AuthenticationProvider { @Override public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { - if (authData instanceof SaslAuthenticationDataSource) { - return ((SaslAuthenticationDataSource)authData).getAuthorizationID(); - } else { - throw new AuthenticationException("Not support authDataSource type, expect sasl."); + try { + if (authData instanceof SaslAuthenticationDataSource) { + AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); + return ((SaslAuthenticationDataSource) authData).getAuthorizationID(); + } else { + throw new AuthenticationException("Not support authDataSource type, expect sasl."); + } + } catch (AuthenticationException exception) { + AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage()); + throw exception; } + } @Override diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java index faa8718..48d867f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import lombok.Cleanup; +import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; import javax.naming.AuthenticationException; import java.io.BufferedReader; @@ -75,24 +76,29 @@ public class AuthenticationProviderBasic implements AuthenticationProvider { String password = authParams.getPassword(); String msg = "Unknown user or invalid password"; - if (users.get(userId) == null) { - throw new AuthenticationException(msg); - } + try { + if (users.get(userId) == null) { + throw new AuthenticationException(msg); + } - String encryptedPassword = users.get(userId); + String encryptedPassword = users.get(userId); - // For md5 algorithm - if ((users.get(userId).startsWith("$apr1"))) { - List<String> splitEncryptedPassword = Arrays.asList(encryptedPassword.split("\\$")); - if (splitEncryptedPassword.size() != 4 || !encryptedPassword - .equals(Md5Crypt.apr1Crypt(password.getBytes(), splitEncryptedPassword.get(2)))) { + // For md5 algorithm + if ((users.get(userId).startsWith("$apr1"))) { + List<String> splitEncryptedPassword = Arrays.asList(encryptedPassword.split("\\$")); + if (splitEncryptedPassword.size() != 4 || !encryptedPassword + .equals(Md5Crypt.apr1Crypt(password.getBytes(), splitEncryptedPassword.get(2)))) { + throw new AuthenticationException(msg); + } + // For crypt algorithm + } else if (!encryptedPassword.equals(Crypt.crypt(password.getBytes(), encryptedPassword.substring(0, 2)))) { throw new AuthenticationException(msg); } - // For crypt algorithm - } else if (!encryptedPassword.equals(Crypt.crypt(password.getBytes(), encryptedPassword.substring(0, 2)))) { - throw new AuthenticationException(msg); + } catch (AuthenticationException exception) { + AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage()); + throw exception; } - + AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); return userId; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java index de04c8a..5747a82 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java @@ -25,6 +25,7 @@ import java.security.cert.X509Certificate; import javax.naming.AuthenticationException; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; public class AuthenticationProviderTls implements AuthenticationProvider { @@ -46,49 +47,53 @@ public class AuthenticationProviderTls implements AuthenticationProvider { @Override public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { String commonName = null; + try { + if (authData.hasDataFromTls()) { + /** + * Maybe authentication type should be checked if it is an HTTPS session. However this check fails actually + * because authType is null. + * + * This check is not necessarily needed, because an untrusted certificate is not passed to + * HttpServletRequest. + * + * <code> + * if (authData.hasDataFromHttp()) { + * String authType = authData.getHttpAuthType(); + * if (!HttpServletRequest.CLIENT_CERT_AUTH.equals(authType)) { + * throw new AuthenticationException( + * String.format( "Authentication type mismatch, Expected: %s, Found: %s", + * HttpServletRequest.CLIENT_CERT_AUTH, authType)); + * } + * } + * </code> + */ - if (authData.hasDataFromTls()) { - /** - * Maybe authentication type should be checked if it is an HTTPS session. However this check fails actually - * because authType is null. - * - * This check is not necessarily needed, because an untrusted certificate is not passed to - * HttpServletRequest. - * - * <code> - * if (authData.hasDataFromHttp()) { - * String authType = authData.getHttpAuthType(); - * if (!HttpServletRequest.CLIENT_CERT_AUTH.equals(authType)) { - * throw new AuthenticationException( - * String.format( "Authentication type mismatch, Expected: %s, Found: %s", - * HttpServletRequest.CLIENT_CERT_AUTH, authType)); - * } - * } - * </code> - */ - - // Extract CommonName - // The format is defined in RFC 2253. - // Example: - // CN=Steve Kille,O=Isode Limited,C=GB - Certificate[] certs = authData.getTlsCertificates(); - if (null == certs) { - throw new AuthenticationException("Failed to get TLS certificates from client"); - } - String distinguishedName = ((X509Certificate) certs[0]).getSubjectX500Principal().getName(); - for (String keyValueStr : distinguishedName.split(",")) { - String[] keyValue = keyValueStr.split("=", 2); - if (keyValue.length == 2 && "CN".equals(keyValue[0]) && !keyValue[1].isEmpty()) { - commonName = keyValue[1]; - break; + // Extract CommonName + // The format is defined in RFC 2253. + // Example: + // CN=Steve Kille,O=Isode Limited,C=GB + Certificate[] certs = authData.getTlsCertificates(); + if (null == certs) { + throw new AuthenticationException("Failed to get TLS certificates from client"); + } + String distinguishedName = ((X509Certificate) certs[0]).getSubjectX500Principal().getName(); + for (String keyValueStr : distinguishedName.split(",")) { + String[] keyValue = keyValueStr.split("=", 2); + if (keyValue.length == 2 && "CN".equals(keyValue[0]) && !keyValue[1].isEmpty()) { + commonName = keyValue[1]; + break; + } } } - } - if (commonName == null) { - throw new AuthenticationException("Client unable to authenticate with TLS certificate"); + if (commonName == null) { + throw new AuthenticationException("Client unable to authenticate with TLS certificate"); + } + AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); + } catch (AuthenticationException exception) { + AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage()); + throw exception; } - return commonName; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java index 66971e3..f2e366f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java @@ -30,6 +30,7 @@ import javax.net.ssl.SSLSession; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.common.api.AuthData; @@ -121,7 +122,14 @@ public class AuthenticationProviderToken implements AuthenticationProvider { @Override public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { // Get Token - String token = getToken(authData); + String token; + try { + token = getToken(authData); + AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName()); + } catch (AuthenticationException exception) { + AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage()); + throw exception; + } // Parse Token by validating return getPrincipal(authenticateToken(token)); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java new file mode 100644 index 0000000..1fd803f --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authentication.metrics; + +import io.prometheus.client.Counter; + +public class AuthenticationMetrics { + private static final Counter authSuccessMetrics = Counter.build() + .name("pulsar_authentication_success_count") + .help("Pulsar authentication success") + .labelNames("provider_name", "auth_method") + .register(); + private static final Counter authFailuresMetrics = Counter.build() + .name("pulsar_authentication_failures_count") + .help("Pulsar authentication failures") + .labelNames("provider_name", "auth_method", "reason") + .register(); + + /** + * Log authenticate success event to the authentication metrics + * @param providerName The short class name of the provider + * @param authMethod Authentication method name + */ + public static void authenticateSuccess(String providerName, String authMethod) { + authSuccessMetrics.labels(providerName, authMethod).inc(); + } + + /** + * Log authenticate failure event to the authentication metrics. + * @param providerName The short class name of the provider + * @param authMethod Authentication method name. + * @param reason Failure reason. + */ + public static void authenticateFailure(String providerName, String authMethod, String reason) { + authFailuresMetrics.labels(providerName, authMethod, reason).inc(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index cba6d1e..58ed618 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -24,21 +24,40 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.StringWriter; import java.lang.reflect.Field; import java.math.RoundingMode; import java.text.NumberFormat; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.HashMap; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; - +import io.jsonwebtoken.SignatureAlgorithm; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.exporter.common.TextFormat; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; @@ -57,6 +76,9 @@ import com.google.common.base.Splitter; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import javax.crypto.SecretKey; +import javax.naming.AuthenticationException; + public class PrometheusMetricsTest extends BrokerTestBase { @BeforeMethod @@ -694,6 +716,61 @@ public class PrometheusMetricsTest extends BrokerTestBase { p2.close(); } + @Test + public void testAuthMetrics() throws IOException, AuthenticationException { + SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + AuthenticationProviderToken provider = new AuthenticationProviderToken(); + + Properties properties = new Properties(); + properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey)); + + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setProperties(properties); + provider.initialize(conf); + + String authExceptionMessage = ""; + + try { + provider.authenticate(new AuthenticationDataSource() { + }); + fail("Should have failed"); + } catch (AuthenticationException e) { + // expected, no credential passed + authExceptionMessage = e.getMessage(); + } + + String token = AuthTokenUtils.createToken(secretKey, "subject", Optional.empty()); + + // Pulsar protocol auth + String subject = provider.authenticate(new AuthenticationDataSource() { + @Override + public boolean hasDataFromCommand() { + return true; + } + + @Override + public String getCommandData() { + return token; + } + }); + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut); + String metricsStr = new String(statsOut.toByteArray()); + Multimap<String, Metric> metrics = parseMetrics(metricsStr); + List<Metric> cm = (List<Metric>) metrics.get("pulsar_authentication_success_count"); + Metric metric = cm.get(cm.size() - 1); + assertEquals(metric.tags.get("auth_method"), "token"); + assertEquals(metric.tags.get("provider_name"), provider.getClass().getSimpleName()); + + cm = (List<Metric>) metrics.get("pulsar_authentication_failures_count"); + metric = cm.get(cm.size() - 1); + assertEquals(metric.tags.get("auth_method"), "token"); + assertEquals(metric.tags.get("reason"), authExceptionMessage); + assertEquals(metric.tags.get("provider_name"), provider.getClass().getSimpleName()); + } + /** * Hacky parsing of Prometheus text format. Sould be good enough for unit tests */
