This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 517cdce9bbf2680c13d7553b4ad02fff67fced27 Author: kannar <kanna...@gmail.com> AuthorDate: Tue Sep 23 18:07:26 2025 +0200 [improve][broker] PIP-402: Optionally prevent role/originalPrincipal logging (#23386) (cherry picked from commit 7c8d3c9210dcaf1d0c3b487af325b7b0a9dc3095) --- conf/broker.conf | 3 + conf/proxy.conf | 3 + conf/standalone.conf | 3 + .../apache/pulsar/broker/ServiceConfiguration.java | 8 +++ ...DefaultAuthenticationRoleLoggingAnonymizer.java | 67 ++++++++++++++++++ .../anonymizer/DefaultRoleAnonymizerType.java | 80 ++++++++++++++++++++++ .../configuration/anonymizer/package-info.java | 23 +++++++ .../apache/pulsar/broker/service/ServerCnx.java | 21 ++++-- .../pulsar/proxy/server/ProxyConfiguration.java | 7 ++ .../pulsar/proxy/server/ProxyConnection.java | 18 +++-- 10 files changed, 220 insertions(+), 13 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 11270ae081e..2e8527edb47 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -909,6 +909,9 @@ metadataStoreBatchingMaxSizeKb=128 # Enable authentication authenticationEnabled=false +# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE +authenticationRoleLoggingAnonymizer=NONE + # Authentication provider name list, which is comma separated list of class names authenticationProviders= diff --git a/conf/proxy.conf b/conf/proxy.conf index 46d84744e12..dc1fc5f002f 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -184,6 +184,9 @@ forwardAuthorizationCredentials=false # Whether authentication is enabled for the Pulsar proxy authenticationEnabled=false +# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE +authenticationRoleLoggingAnonymizer=NONE + # Authentication provider name list (a comma-separated list of class names) authenticationProviders= diff --git a/conf/standalone.conf b/conf/standalone.conf index 2b205a2b2f6..708d4905b8a 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -572,6 +572,9 @@ authenticateOriginalAuthData=false # Enable authentication authenticationEnabled=false +# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, hash:MD5, default is NONE +authenticationRoleLoggingAnonymizer=NONE + # Authentication provider name list, which is comma separated list of class names authenticationProviders= diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 279ed9ed73e..5d2d94ea477 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -445,6 +445,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String clusterName; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Defines how the broker will anonymize the role and originalAuthRole before logging. " + + "Possible values are: NONE (no anonymization), REDACTED (replaces with '[REDACTED]'), " + + "hash:SHA256 (hashes using SHA-256), and hash:MD5 (hashes using MD5). Default is NONE." + ) + private String authenticationRoleLoggingAnonymizer = "NONE"; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java new file mode 100644 index 00000000000..151118c6d5e --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.configuration.anonymizer; + +import static org.apache.pulsar.common.configuration.anonymizer.DefaultRoleAnonymizerType.NONE; + +/** + * This class provides a utility to anonymize authentication roles before logging, + * based on a configured anonymization strategy. The anonymization strategy is + * determined by the provided value of the {@link DefaultRoleAnonymizerType} enum. + * + * The primary purpose of this class is to enable flexible anonymization of sensitive + * information (such as user roles) in logs, ensuring compliance with security and + * privacy requirements while allowing customization of the anonymization behavior. + * + * Usage: + * - The class constructor accepts a string that represents the desired anonymization + * strategy (e.g., "NONE", "REDACTED", "SHA256", "MD5"), and it initializes the + * anonymizer type accordingly. + * - The {@code anonymize} method applies the selected anonymization strategy to + * the provided role and returns the anonymized value. + * + * Example: + * <pre> + * DefaultAuthenticationRoleLoggingAnonymizer roleAnonymizer = + * new DefaultAuthenticationRoleLoggingAnonymizer("SHA256"); + * String anonymizedRole = roleAnonymizer.anonymize("admin"); + * </pre> + * + * Anonymization strategies: + * - NONE: No anonymization (returns the role as-is). + * - REDACTED: Replaces the role with "[REDACTED]". + * - hash:SHA256: Hashes the role using the SHA-256 algorithm and prefixes it with "SHA-256:". + * - hash:MD5: Hashes the role using the MD5 algorithm and prefixes it with "MD5:" + */ +public final class DefaultAuthenticationRoleLoggingAnonymizer { + private static DefaultRoleAnonymizerType anonymizerType = NONE; + + public DefaultAuthenticationRoleLoggingAnonymizer(String authenticationRoleLoggingAnonymizer) { + if (authenticationRoleLoggingAnonymizer.startsWith("hash:")) { + anonymizerType = DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer + .substring("hash:".length()).toUpperCase()); + } else { + anonymizerType = DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer); + } + } + + public String anonymize(String role) { + return anonymizerType.anonymize(role); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java new file mode 100644 index 00000000000..3333b69cf22 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.configuration.anonymizer; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +public enum DefaultRoleAnonymizerType { + NONE { + @Override + public String anonymize(String role) { + return role; + } + }, + REDACTED { + @Override + public String anonymize(String role) { + return REDACTED_VALUE; + } + }, + SHA256 { + private static final String PREFIX = "SHA-256:"; + private final MessageDigest digest; + + { + // Initializing the MessageDigest once for SHA-256 + try { + digest = MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("SHA-256 algorithm not found", e); + } + } + + @Override + public String anonymize(String role) { + byte[] hash = digest.digest(role.getBytes()); + return PREFIX + Base64.getEncoder().encodeToString(hash); + } + }, + MD5 { + private static final String PREFIX = "MD5:"; + private final MessageDigest digest; + + { + // Initializing the MessageDigest once for MD5 + try { + // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient for this use case& + digest = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("MD5 algorithm not found", e); + } + } + + @Override + public String anonymize(String role) { + byte[] hash = digest.digest(role.getBytes()); + return PREFIX + Base64.getEncoder().encodeToString(hash); + } + }; + + private static final String REDACTED_VALUE = "[REDACTED]"; + public abstract String anonymize(String role); +} \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java new file mode 100644 index 00000000000..7b1c506bf72 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pulsar Client API. + */ +package org.apache.pulsar.common.configuration.anonymizer; + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 44927c375b5..1c61d4c467f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -148,6 +148,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.Schema; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.api.proto.TxnAction; +import org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.Metadata; @@ -215,6 +216,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private AuthData originalAuthDataCopy; private boolean pendingAuthChallengeResponse = false; private ScheduledFuture<?> authRefreshTask; + private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer; // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow // control done by a single producer might not be enough to prevent write spikes on the broker. @@ -352,6 +354,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { this.brokerInterceptor = this.service != null ? this.service.getInterceptor() : null; this.throttleTracker = new ServerCnxThrottleTracker(this); topicsPatternImplementation = conf.getTopicsPatternRegexImplementation(); + this.authenticationRoleLoggingAnonymizer = new DefaultAuthenticationRoleLoggingAnonymizer( + conf.getAuthenticationRoleLoggingAnonymizer()); } @Override @@ -821,12 +825,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { clientVersion, clientProtoVersion, proxyVersion); } else if (originalPrincipal != null) { log.info("[{}] connected role={} and originalAuthRole={} using authMethod={}, clientVersion={}, " - + "clientProtocolVersion={}, proxyVersion={}", remoteAddress, authRole, originalPrincipal, - authMethod, clientVersion, clientProtoVersion, proxyVersion); + + "clientProtocolVersion={}, proxyVersion={}", remoteAddress, + authenticationRoleLoggingAnonymizer.anonymize(authRole), + authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), authMethod, clientVersion, + clientProtoVersion, proxyVersion); } else { log.info("[{}] connected with role={} using authMethod={}, clientVersion={}, clientProtocolVersion={}, " - + "proxyVersion={}", remoteAddress, authRole, authMethod, clientVersion, clientProtoVersion, - proxyVersion); + + "proxyVersion={}", remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(authRole), + authMethod, clientVersion, clientProtoVersion, proxyVersion); } if (brokerInterceptor != null) { brokerInterceptor.onConnectionCreated(this); @@ -1214,7 +1220,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (log.isDebugEnabled()) { log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}", - remoteAddress, authRole, originalPrincipal); + remoteAddress, authenticationRoleLoggingAnonymizer.anonymize(authRole), + authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal)); } final String subscriptionName = subscribe.getSubscription(); @@ -2433,11 +2440,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> { if (!isProxyAuthorized) { log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}", - originalPrincipal, operation, namespaceName); + authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), operation, namespaceName); } if (!isAuthorized) { log.warn("Role {} is not authorized to perform operation {} on namespace {}", - authRole, operation, namespaceName); + authenticationRoleLoggingAnonymizer.anonymize(authRole), operation, namespaceName); } return isProxyAuthorized && isAuthorized; }); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index d89801d360b..6db1b302c66 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -295,6 +295,13 @@ public class ProxyConfiguration implements PulsarConfiguration { + "is enabled.") private Boolean webServiceLogDetailedAddresses; + @FieldContext(category = CATEGORY_SERVER, doc = + "Defines how the broker will anonymize the role and originalAuthRole before logging. " + + "Possible values are: NONE (no anonymization), REDACTED (replaces with '[REDACTED]'), " + + "hash:SHA256 (hashes using SHA-256), and hash:MD5 (hashes using MD5). Default is NONE." + ) + private String authenticationRoleLoggingAnonymizer = "NONE"; + @FieldContext(category = CATEGORY_SERVER, doc = "Enables zero-copy transport of data across network interfaces using the spice. " + "Zero copy mode cannot be used when TLS is enabled or when proxyLogLevel is > 0.") diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 4f89f4bc17e..372d45ffe60 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -72,6 +72,7 @@ import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; import org.apache.pulsar.common.util.Runnables; @@ -120,6 +121,7 @@ public class ProxyConnection extends PulsarHandler { private int protocolVersionToAdvertise; private String proxyToBrokerUrl; private HAProxyMessage haProxyMessage; + private final DefaultAuthenticationRoleLoggingAnonymizer authenticationRoleLoggingAnonymizer; protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024; private static final byte[] EMPTY_CREDENTIALS = new byte[0]; @@ -161,6 +163,8 @@ public class ProxyConnection extends PulsarHandler { this.state = State.Init; this.brokerProxyValidator = service.getBrokerProxyValidator(); this.connectionController = proxyService.getConnectionController(); + this.authenticationRoleLoggingAnonymizer = new DefaultAuthenticationRoleLoggingAnonymizer( + proxyService.getConfiguration().getAuthenticationRoleLoggingAnonymizer()); } @Override @@ -343,8 +347,9 @@ public class ProxyConnection extends PulsarHandler { private synchronized void completeConnect() throws PulsarClientException { checkArgument(state == State.Connecting); + String maybeAnonymizedClientAuthRole = authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole); LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}", - remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl); + remoteAddress, authMethod, maybeAnonymizedClientAuthRole, hasProxyToBrokerUrl); if (hasProxyToBrokerUrl) { // Optimize proxy connection to fail-fast if the target broker isn't active // Pulsar client will retry connecting after a back off timeout @@ -352,7 +357,7 @@ public class ProxyConnection extends PulsarHandler { && !isBrokerActive(proxyToBrokerUrl)) { state = State.Closing; LOG.warn("[{}] Target broker '{}' isn't available. authenticated with {} role {}.", - remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole); + remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole); final ByteBuf msg = Commands.newError(-1, ServerError.ServiceNotReady, "Target broker isn't available."); writeAndFlushAndClose(msg); @@ -371,10 +376,11 @@ public class ProxyConnection extends PulsarHandler { LOG.warn("[{}] Target broker '{}' cannot be validated. {}. authenticated with {} role {}.", remoteAddress, proxyToBrokerUrl, targetAddressDeniedException.getMessage(), - authMethod, clientAuthRole); + authMethod, maybeAnonymizedClientAuthRole); } else { LOG.error("[{}] Error validating target broker '{}'. authenticated with {} role {}.", - remoteAddress, proxyToBrokerUrl, authMethod, clientAuthRole, throwable); + remoteAddress, proxyToBrokerUrl, authMethod, maybeAnonymizedClientAuthRole, + throwable); } final ByteBuf msg = Commands.newError(-1, ServerError.ServiceNotReady, "Target broker cannot be validated."); @@ -401,7 +407,7 @@ public class ProxyConnection extends PulsarHandler { Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null); } else { LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}", - remoteAddress, state, clientAuthRole); + remoteAddress, state, maybeAnonymizedClientAuthRole); } state = State.ProxyLookupRequests; @@ -488,7 +494,7 @@ public class ProxyConnection extends PulsarHandler { clientAuthRole = authState.getAuthRole(); if (LOG.isDebugEnabled()) { LOG.debug("[{}] Client successfully authenticated with {} role {}", - remoteAddress, authMethod, clientAuthRole); + remoteAddress, authMethod, authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole)); } // First connection