This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 517cdce9bbf2680c13d7553b4ad02fff67fced27
Author: kannar <kanna...@gmail.com>
AuthorDate: Tue Sep 23 18:07:26 2025 +0200

    [improve][broker] PIP-402: Optionally prevent role/originalPrincipal 
logging (#23386)
    
    (cherry picked from commit 7c8d3c9210dcaf1d0c3b487af325b7b0a9dc3095)
---
 conf/broker.conf                                   |  3 +
 conf/proxy.conf                                    |  3 +
 conf/standalone.conf                               |  3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  8 +++
 ...DefaultAuthenticationRoleLoggingAnonymizer.java | 67 ++++++++++++++++++
 .../anonymizer/DefaultRoleAnonymizerType.java      | 80 ++++++++++++++++++++++
 .../configuration/anonymizer/package-info.java     | 23 +++++++
 .../apache/pulsar/broker/service/ServerCnx.java    | 21 ++++--
 .../pulsar/proxy/server/ProxyConfiguration.java    |  7 ++
 .../pulsar/proxy/server/ProxyConnection.java       | 18 +++--
 10 files changed, 220 insertions(+), 13 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 11270ae081e..2e8527edb47 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -909,6 +909,9 @@ metadataStoreBatchingMaxSizeKb=128
 # Enable authentication
 authenticationEnabled=false
 
+# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, 
hash:MD5, default is NONE
+authenticationRoleLoggingAnonymizer=NONE
+
 # Authentication provider name list, which is comma separated list of class 
names
 authenticationProviders=
 
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 46d84744e12..dc1fc5f002f 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -184,6 +184,9 @@ forwardAuthorizationCredentials=false
 # Whether authentication is enabled for the Pulsar proxy
 authenticationEnabled=false
 
+# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, 
hash:MD5, default is NONE
+authenticationRoleLoggingAnonymizer=NONE
+
 # Authentication provider name list (a comma-separated list of class names)
 authenticationProviders=
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 2b205a2b2f6..708d4905b8a 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -572,6 +572,9 @@ authenticateOriginalAuthData=false
 # Enable authentication
 authenticationEnabled=false
 
+# Enable authentication role anonymizer, can be REDACTED, hash:SHA256, 
hash:MD5, default is NONE
+authenticationRoleLoggingAnonymizer=NONE
+
 # Authentication provider name list, which is comma separated list of class 
names
 authenticationProviders=
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 279ed9ed73e..5d2d94ea477 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -445,6 +445,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private String clusterName;
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Defines how the broker will anonymize the role and 
originalAuthRole before logging. "
+                    + "Possible values are: NONE (no anonymization), REDACTED 
(replaces with '[REDACTED]'), "
+                    + "hash:SHA256 (hashes using SHA-256), and hash:MD5 
(hashes using MD5). Default is NONE."
+    )
+    private String authenticationRoleLoggingAnonymizer = "NONE";
+
     @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java
new file mode 100644
index 00000000000..151118c6d5e
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultAuthenticationRoleLoggingAnonymizer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.configuration.anonymizer;
+
+import static 
org.apache.pulsar.common.configuration.anonymizer.DefaultRoleAnonymizerType.NONE;
+
+/**
+ * This class provides a utility to anonymize authentication roles before 
logging,
+ * based on a configured anonymization strategy. The anonymization strategy is
+ * determined by the provided value of the {@link DefaultRoleAnonymizerType} 
enum.
+ *
+ * The primary purpose of this class is to enable flexible anonymization of 
sensitive
+ * information (such as user roles) in logs, ensuring compliance with security 
and
+ * privacy requirements while allowing customization of the anonymization 
behavior.
+ *
+ * Usage:
+ * - The class constructor accepts a string that represents the desired 
anonymization
+ *   strategy (e.g., "NONE", "REDACTED", "SHA256", "MD5"), and it initializes 
the
+ *   anonymizer type accordingly.
+ * - The {@code anonymize} method applies the selected anonymization strategy 
to
+ *   the provided role and returns the anonymized value.
+ *
+ * Example:
+ * <pre>
+ * DefaultAuthenticationRoleLoggingAnonymizer roleAnonymizer =
+ *     new DefaultAuthenticationRoleLoggingAnonymizer("SHA256");
+ * String anonymizedRole = roleAnonymizer.anonymize("admin");
+ * </pre>
+ *
+ * Anonymization strategies:
+ * - NONE: No anonymization (returns the role as-is).
+ * - REDACTED: Replaces the role with "[REDACTED]".
+ * - hash:SHA256: Hashes the role using the SHA-256 algorithm and prefixes it 
with "SHA-256:".
+ * - hash:MD5: Hashes the role using the MD5 algorithm and prefixes it with 
"MD5:"
+ */
+public final class DefaultAuthenticationRoleLoggingAnonymizer {
+   private static DefaultRoleAnonymizerType anonymizerType = NONE;
+
+   public DefaultAuthenticationRoleLoggingAnonymizer(String 
authenticationRoleLoggingAnonymizer) {
+      if (authenticationRoleLoggingAnonymizer.startsWith("hash:")) {
+         anonymizerType = 
DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer
+                 .substring("hash:".length()).toUpperCase());
+      } else {
+         anonymizerType = 
DefaultRoleAnonymizerType.valueOf(authenticationRoleLoggingAnonymizer);
+      }
+   }
+
+   public String anonymize(String role) {
+      return anonymizerType.anonymize(role);
+   }
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java
new file mode 100644
index 00000000000..3333b69cf22
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/DefaultRoleAnonymizerType.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.configuration.anonymizer;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+
+public enum DefaultRoleAnonymizerType {
+   NONE {
+      @Override
+      public String anonymize(String role) {
+         return role;
+      }
+   },
+   REDACTED {
+      @Override
+      public String anonymize(String role) {
+         return REDACTED_VALUE;
+      }
+   },
+   SHA256 {
+      private static final String PREFIX = "SHA-256:";
+      private final MessageDigest digest;
+
+      {
+         // Initializing the MessageDigest once for SHA-256
+         try {
+            digest = MessageDigest.getInstance("SHA-256");
+         } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("SHA-256 algorithm not found", e);
+         }
+      }
+
+      @Override
+      public String anonymize(String role) {
+         byte[] hash = digest.digest(role.getBytes());
+         return PREFIX + Base64.getEncoder().encodeToString(hash);
+      }
+   },
+   MD5 {
+      private static final String PREFIX = "MD5:";
+      private final MessageDigest digest;
+
+      {
+         // Initializing the MessageDigest once for MD5
+         try {
+            // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient 
for this use case&
+            digest = MessageDigest.getInstance("MD5");
+         } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not found", e);
+         }
+      }
+
+      @Override
+      public String anonymize(String role) {
+         byte[] hash = digest.digest(role.getBytes());
+         return PREFIX + Base64.getEncoder().encodeToString(hash);
+      }
+   };
+
+   private static final String REDACTED_VALUE = "[REDACTED]";
+   public abstract String anonymize(String role);
+}
\ No newline at end of file
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java
new file mode 100644
index 00000000000..7b1c506bf72
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/anonymizer/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Pulsar Client API.
+ */
+package org.apache.pulsar.common.configuration.anonymizer;
+
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 44927c375b5..1c61d4c467f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -148,6 +148,7 @@ import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.Schema;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import 
org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.Metadata;
@@ -215,6 +216,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     private AuthData originalAuthDataCopy;
     private boolean pendingAuthChallengeResponse = false;
     private ScheduledFuture<?> authRefreshTask;
+    private final DefaultAuthenticationRoleLoggingAnonymizer 
authenticationRoleLoggingAnonymizer;
 
     // Max number of pending requests per connections. If multiple producers 
are sharing the same connection the flow
     // control done by a single producer might not be enough to prevent write 
spikes on the broker.
@@ -352,6 +354,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         this.brokerInterceptor = this.service != null ? 
this.service.getInterceptor() : null;
         this.throttleTracker = new ServerCnxThrottleTracker(this);
         topicsPatternImplementation = 
conf.getTopicsPatternRegexImplementation();
+        this.authenticationRoleLoggingAnonymizer = new 
DefaultAuthenticationRoleLoggingAnonymizer(
+                conf.getAuthenticationRoleLoggingAnonymizer());
     }
 
     @Override
@@ -821,12 +825,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                     clientVersion, clientProtoVersion, proxyVersion);
         } else if (originalPrincipal != null) {
             log.info("[{}] connected role={} and originalAuthRole={} using 
authMethod={}, clientVersion={}, "
-                            + "clientProtocolVersion={}, proxyVersion={}", 
remoteAddress, authRole, originalPrincipal,
-                    authMethod, clientVersion, clientProtoVersion, 
proxyVersion);
+                            + "clientProtocolVersion={}, proxyVersion={}", 
remoteAddress,
+                    authenticationRoleLoggingAnonymizer.anonymize(authRole),
+                    
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), authMethod, 
clientVersion,
+                    clientProtoVersion, proxyVersion);
         } else {
             log.info("[{}] connected with role={} using authMethod={}, 
clientVersion={}, clientProtocolVersion={}, "
-                            + "proxyVersion={}", remoteAddress, authRole, 
authMethod, clientVersion, clientProtoVersion,
-                    proxyVersion);
+                           + "proxyVersion={}", remoteAddress, 
authenticationRoleLoggingAnonymizer.anonymize(authRole),
+                    authMethod, clientVersion, clientProtoVersion, 
proxyVersion);
         }
         if (brokerInterceptor != null) {
             brokerInterceptor.onConnectionCreated(this);
@@ -1214,7 +1220,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Handle subscribe command: auth role = {}, original 
auth role = {}",
-                remoteAddress, authRole, originalPrincipal);
+                remoteAddress, 
authenticationRoleLoggingAnonymizer.anonymize(authRole),
+                    
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal));
         }
 
         final String subscriptionName = subscribe.getSubscription();
@@ -2433,11 +2440,11 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, 
(isProxyAuthorized, isAuthorized) -> {
             if (!isProxyAuthorized) {
                 log.warn("OriginalRole {} is not authorized to perform 
operation {} on namespace {}",
-                        originalPrincipal, operation, namespaceName);
+                        
authenticationRoleLoggingAnonymizer.anonymize(originalPrincipal), operation, 
namespaceName);
             }
             if (!isAuthorized) {
                 log.warn("Role {} is not authorized to perform operation {} on 
namespace {}",
-                        authRole, operation, namespaceName);
+                        
authenticationRoleLoggingAnonymizer.anonymize(authRole), operation, 
namespaceName);
             }
             return isProxyAuthorized && isAuthorized;
         });
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index d89801d360b..6db1b302c66 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -295,6 +295,13 @@ public class ProxyConfiguration implements 
PulsarConfiguration {
                     + "is enabled.")
     private Boolean webServiceLogDetailedAddresses;
 
+    @FieldContext(category = CATEGORY_SERVER, doc =
+            "Defines how the broker will anonymize the role and 
originalAuthRole before logging. "
+                    + "Possible values are: NONE (no anonymization), REDACTED 
(replaces with '[REDACTED]'), "
+                    + "hash:SHA256 (hashes using SHA-256), and hash:MD5 
(hashes using MD5). Default is NONE."
+    )
+    private String authenticationRoleLoggingAnonymizer = "NONE";
+
     @FieldContext(category = CATEGORY_SERVER,
             doc = "Enables zero-copy transport of data across network 
interfaces using the spice. "
                     + "Zero copy mode cannot be used when TLS is enabled or 
when proxyLogLevel is > 0.")
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 4f89f4bc17e..372d45ffe60 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -72,6 +72,7 @@ import 
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ServerError;
+import 
org.apache.pulsar.common.configuration.anonymizer.DefaultAuthenticationRoleLoggingAnonymizer;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.util.Runnables;
@@ -120,6 +121,7 @@ public class ProxyConnection extends PulsarHandler {
     private int protocolVersionToAdvertise;
     private String proxyToBrokerUrl;
     private HAProxyMessage haProxyMessage;
+    private final DefaultAuthenticationRoleLoggingAnonymizer 
authenticationRoleLoggingAnonymizer;
 
     protected static final Integer SPLICE_BYTES = 1024 * 1024 * 1024;
     private static final byte[] EMPTY_CREDENTIALS = new byte[0];
@@ -161,6 +163,8 @@ public class ProxyConnection extends PulsarHandler {
         this.state = State.Init;
         this.brokerProxyValidator = service.getBrokerProxyValidator();
         this.connectionController = proxyService.getConnectionController();
+        this.authenticationRoleLoggingAnonymizer = new 
DefaultAuthenticationRoleLoggingAnonymizer(
+                
proxyService.getConfiguration().getAuthenticationRoleLoggingAnonymizer());
     }
 
     @Override
@@ -343,8 +347,9 @@ public class ProxyConnection extends PulsarHandler {
 
     private synchronized void completeConnect() throws PulsarClientException {
         checkArgument(state == State.Connecting);
+        String maybeAnonymizedClientAuthRole = 
authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole);
         LOG.info("[{}] complete connection, init proxy handler. authenticated 
with {} role {}, hasProxyToBrokerUrl: {}",
-                remoteAddress, authMethod, clientAuthRole, 
hasProxyToBrokerUrl);
+                remoteAddress, authMethod, maybeAnonymizedClientAuthRole, 
hasProxyToBrokerUrl);
         if (hasProxyToBrokerUrl) {
             // Optimize proxy connection to fail-fast if the target broker 
isn't active
             // Pulsar client will retry connecting after a back off timeout
@@ -352,7 +357,7 @@ public class ProxyConnection extends PulsarHandler {
                     && !isBrokerActive(proxyToBrokerUrl)) {
                 state = State.Closing;
                 LOG.warn("[{}] Target broker '{}' isn't available. 
authenticated with {} role {}.",
-                        remoteAddress, proxyToBrokerUrl, authMethod, 
clientAuthRole);
+                        remoteAddress, proxyToBrokerUrl, authMethod, 
maybeAnonymizedClientAuthRole);
                 final ByteBuf msg = Commands.newError(-1,
                         ServerError.ServiceNotReady, "Target broker isn't 
available.");
                 writeAndFlushAndClose(msg);
@@ -371,10 +376,11 @@ public class ProxyConnection extends PulsarHandler {
 
                             LOG.warn("[{}] Target broker '{}' cannot be 
validated. {}. authenticated with {} role {}.",
                                     remoteAddress, proxyToBrokerUrl, 
targetAddressDeniedException.getMessage(),
-                                    authMethod, clientAuthRole);
+                                    authMethod, maybeAnonymizedClientAuthRole);
                         } else {
                             LOG.error("[{}] Error validating target broker 
'{}'. authenticated with {} role {}.",
-                                    remoteAddress, proxyToBrokerUrl, 
authMethod, clientAuthRole, throwable);
+                                    remoteAddress, proxyToBrokerUrl, 
authMethod, maybeAnonymizedClientAuthRole,
+                                    throwable);
                         }
                         final ByteBuf msg = Commands.newError(-1, 
ServerError.ServiceNotReady,
                                 "Target broker cannot be validated.");
@@ -401,7 +407,7 @@ public class ProxyConnection extends PulsarHandler {
                         
Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())),
 null);
             } else {
                 LOG.error("BUG! Connection Pool has already been created for 
proxy connection to {} state {} role {}",
-                        remoteAddress, state, clientAuthRole);
+                        remoteAddress, state, maybeAnonymizedClientAuthRole);
             }
 
             state = State.ProxyLookupRequests;
@@ -488,7 +494,7 @@ public class ProxyConnection extends PulsarHandler {
                 clientAuthRole = authState.getAuthRole();
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("[{}] Client successfully authenticated with {} 
role {}",
-                            remoteAddress, authMethod, clientAuthRole);
+                            remoteAddress, authMethod, 
authenticationRoleLoggingAnonymizer.anonymize(clientAuthRole));
                 }
 
                 // First connection

Reply via email to