This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 844bc0512b2 MINOR: Improve some requests/responses toString method 
(#20775)
844bc0512b2 is described below

commit 844bc0512b270b670789f9f8025b3168524cc271
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Oct 28 21:58:12 2025 +0800

    MINOR: Improve some requests/responses toString method (#20775)
    
    Improve some requests/responses toString method to log only the
    required
    info, including the request.Builder toString methods.
    1. AlterConfigsRequest
    2. AlterUserScramCredentialsRequest
    3. ExpireDelegationTokenRequest
    4. IncrementalAlterConfigsRequest
    5. RenewDelegationTokenRequest
    6. SaslAuthenticateRequest
    7. createDelegationTokenResponse
    8. describeDelegationTokenResponse
    9. SaslAuthenticateResponse
    
    Reviewers: Chia-Ping Tsai <[email protected]>
    
    Co-authored-by: Luke Chen <[email protected]>
---
 .../kafka/common/requests/AlterConfigsRequest.java | 21 +++++++++
 .../requests/AlterUserScramCredentialsRequest.java | 23 +++++----
 .../requests/CreateDelegationTokenResponse.java    |  9 ++++
 .../requests/DescribeDelegationTokenResponse.java  | 11 +++++
 .../requests/ExpireDelegationTokenRequest.java     | 14 +++++-
 .../requests/IncrementalAlterConfigsRequest.java   | 24 +++++-----
 .../requests/RenewDelegationTokenRequest.java      | 14 +++++-
 .../common/requests/SaslAuthenticateRequest.java   |  8 ++++
 .../common/requests/SaslAuthenticateResponse.java  |  8 ++++
 .../kafka/common/requests/RequestResponseTest.java | 55 ++++++++++++++++++++--
 .../unit/kafka/network/RequestChannelTest.scala    | 19 ++++++--
 11 files changed, 171 insertions(+), 35 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index b4d35d52ae3..df3ce5ee2e7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -88,6 +88,11 @@ public class AlterConfigsRequest extends AbstractRequest {
         public AlterConfigsRequest build(short version) {
             return new AlterConfigsRequest(data, version);
         }
+
+        @Override
+        public String toString() {
+            return maskData(data);
+        }
     }
 
     private final AlterConfigsRequestData data;
@@ -135,4 +140,20 @@ public class AlterConfigsRequest extends AbstractRequest {
     public static AlterConfigsRequest parse(ByteBuffer buffer, short version) {
         return new AlterConfigsRequest(new AlterConfigsRequestData(new 
ByteBufferAccessor(buffer), version), version);
     }
+
+    // It is not safe to print all config values
+    private static String maskData(AlterConfigsRequestData data) {
+        AlterConfigsRequestData tempData = data.duplicate();
+        tempData.resources().forEach(resource -> {
+            resource.configs().forEach(config -> {
+                config.setValue("REDACTED");
+            });
+        });
+        return tempData.toString();
+    }
+
+    @Override
+    public String toString() {
+        return maskData(data);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
index c779f6d9a84..a51aa7dc406 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
@@ -17,14 +17,10 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
-import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
 import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
@@ -48,7 +44,7 @@ public class AlterUserScramCredentialsRequest extends 
AbstractRequest {
 
         @Override
         public String toString() {
-            return data.toString();
+            return maskData(data);
         }
     }
 
@@ -87,15 +83,18 @@ public class AlterUserScramCredentialsRequest extends 
AbstractRequest {
         return new AlterUserScramCredentialsResponse(new 
AlterUserScramCredentialsResponseData().setResults(results));
     }
 
+    private static String maskData(AlterUserScramCredentialsRequestData data) {
+        AlterUserScramCredentialsRequestData tempData = data.duplicate();
+        tempData.upsertions().forEach(upsertion -> {
+            upsertion.setSalt(new byte[0]);
+            upsertion.setSaltedPassword(new byte[0]);
+        });
+        return tempData.toString();
+    }
+
     // Do not print salt or saltedPassword
     @Override
     public String toString() {
-        JsonNode json = 
AlterUserScramCredentialsRequestDataJsonConverter.write(data, 
version()).deepCopy();
-
-        for (JsonNode upsertion : json.get("upsertions")) {
-            ((ObjectNode) upsertion).put("salt", "");
-            ((ObjectNode) upsertion).put("saltedPassword", "");
-        }
-        return AlterUserScramCredentialsRequestDataJsonConverter.read(json, 
version()).toString();
+        return maskData(data);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
index 0a9f9a8991b..287013d3aad 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -103,4 +103,13 @@ public class CreateDelegationTokenResponse extends 
AbstractResponse {
     public boolean shouldClientThrottle(short version) {
         return version >= 1;
     }
+
+    // Do not print tokenId and Hmac, overwrite a temp copy of the data with 
empty content
+    @Override
+    public String toString() {
+        CreateDelegationTokenResponseData tempData = data.duplicate();
+        tempData.setTokenId("REDACTED");
+        tempData.setHmac(new byte[0]);
+        return tempData.toString();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index a922f056a89..87b358249f5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -128,4 +128,15 @@ public class DescribeDelegationTokenResponse extends 
AbstractResponse {
     public boolean shouldClientThrottle(short version) {
         return version >= 1;
     }
+
+    // Do not print tokenId and Hmac, overwrite a temp copy of the data with 
empty content
+    @Override
+    public String toString() {
+        DescribeDelegationTokenResponseData tempData = data.duplicate();
+        tempData.tokens().forEach(token -> {
+            token.setTokenId("REDACTED");
+            token.setHmac(new byte[0]);
+        });
+        return tempData.toString();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 3660a456460..ab4bcd9533e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -74,7 +74,19 @@ public class ExpireDelegationTokenRequest extends 
AbstractRequest {
 
         @Override
         public String toString() {
-            return data.toString();
+            return maskData(data);
         }
     }
+
+    private static String maskData(ExpireDelegationTokenRequestData data) {
+        ExpireDelegationTokenRequestData tempData = data.duplicate();
+        tempData.setHmac(new byte[0]);
+        return tempData.toString();
+    }
+
+    // Do not print Hmac, overwrite a temp copy of the data with empty content
+    @Override
+    public String toString() {
+        return maskData(data);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
index 54540837756..bfac0e4074d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
@@ -21,15 +21,11 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
-import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
@@ -77,7 +73,7 @@ public class IncrementalAlterConfigsRequest extends 
AbstractRequest {
 
         @Override
         public String toString() {
-            return data.toString();
+            return maskData(data);
         }
     }
 
@@ -113,14 +109,18 @@ public class IncrementalAlterConfigsRequest extends 
AbstractRequest {
     }
 
     // It is not safe to print all config values
+    private static String maskData(IncrementalAlterConfigsRequestData data) {
+        IncrementalAlterConfigsRequestData tempData = data.duplicate();
+        tempData.resources().forEach(resource -> {
+            resource.configs().forEach(config -> {
+                config.setValue("REDACTED");
+            });
+        });
+        return tempData.toString();
+    }
+
     @Override
     public String toString() {
-        JsonNode json = 
IncrementalAlterConfigsRequestDataJsonConverter.write(data, 
version()).deepCopy();
-        for (JsonNode resource : json.get("resources")) {
-            for (JsonNode config : resource.get("configs")) {
-                ((ObjectNode) config).put("value", "REDACTED");
-            }
-        }
-        return IncrementalAlterConfigsRequestDataJsonConverter.read(json, 
version()).toString();
+        return maskData(data);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index 963097093dc..deeb0dbf23a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -66,7 +66,19 @@ public class RenewDelegationTokenRequest extends 
AbstractRequest {
 
         @Override
         public String toString() {
-            return data.toString();
+            return maskData(data);
         }
     }
+
+    private static String maskData(RenewDelegationTokenRequestData data) {
+        RenewDelegationTokenRequestData tempData = data.duplicate();
+        tempData.setHmac(new byte[0]);
+        return tempData.toString();
+    }
+
+    // Do not print Hmac, overwrite a temp copy of the data with empty content
+    @Override
+    public String toString() {
+        return maskData(data);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
index 47dd5fd3157..0816744a97c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -78,4 +78,12 @@ public class SaslAuthenticateRequest extends AbstractRequest 
{
         return new SaslAuthenticateRequest(new SaslAuthenticateRequestData(new 
ByteBufferAccessor(buffer), version),
             version);
     }
+
+    // Do not print authBytes, overwrite a temp copy of the data with empty 
bytes
+    @Override
+    public String toString() {
+        SaslAuthenticateRequestData tempData = data.duplicate();
+        tempData.setAuthBytes(new byte[0]);
+        return tempData.toString();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index d6ca8c170dc..810595279f0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -80,4 +80,12 @@ public class SaslAuthenticateResponse extends 
AbstractResponse {
     public static SaslAuthenticateResponse parse(ByteBuffer buffer, short 
version) {
         return new SaslAuthenticateResponse(new 
SaslAuthenticateResponseData(new ByteBufferAccessor(buffer), version));
     }
+
+    // Do not print authBytes, overwrite a temp copy of the data with empty 
bytes
+    @Override
+    public String toString() {
+        SaslAuthenticateResponseData tempData = data.duplicate();
+        tempData.setAuthBytes(new byte[0]);
+        return tempData.toString();
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ad52482a3b3..008e60d6e94 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -305,7 +305,6 @@ import static 
org.apache.kafka.common.protocol.ApiKeys.OFFSET_FETCH;
 import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
 import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE;
 import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
-import static org.apache.kafka.common.protocol.ApiKeys.WRITE_TXN_MARKERS;
 import static 
org.apache.kafka.common.requests.EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
 import static 
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -2989,7 +2988,7 @@ public class RequestResponseTest {
     }
 
     private AlterConfigsRequest createAlterConfigsRequest(short version) {
-        Map<ConfigResource, AlterConfigsRequest.Config> configs = new 
HashMap<>();
+        Map<ConfigResource, AlterConfigsRequest.Config> configs = new 
LinkedHashMap<>();
         List<AlterConfigsRequest.ConfigEntry> configEntries = asList(
                 new AlterConfigsRequest.ConfigEntry("config_name", 
"config_value"),
                 new AlterConfigsRequest.ConfigEntry("another_name", "another 
value")
@@ -2997,7 +2996,19 @@ public class RequestResponseTest {
         configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new 
AlterConfigsRequest.Config(configEntries));
         configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
                 new AlterConfigsRequest.Config(emptyList()));
-        return new AlterConfigsRequest.Builder(configs, false).build(version);
+        AlterConfigsRequest alterConfigsRequest = new 
AlterConfigsRequest.Builder(configs, false).build(version);
+        assertEquals(
+                "AlterConfigsRequestData(resources=[" +
+                        "AlterConfigsResource(resourceType=" + 
ConfigResource.Type.BROKER.id() + ", " +
+                        "resourceName='0', " +
+                        "configs=[AlterableConfig(name='config_name', 
value='REDACTED'), " +
+                        "AlterableConfig(name='another_name', 
value='REDACTED')]), " +
+                        "AlterConfigsResource(resourceType=" + 
ConfigResource.Type.TOPIC.id() + ", " +
+                        "resourceName='topic', configs=[])], " +
+                        "validateOnly=false)",
+                alterConfigsRequest.toString()
+        );
+        return alterConfigsRequest;
     }
 
     private AlterConfigsResponse createAlterConfigsResponse() {
@@ -3100,7 +3111,12 @@ public class RequestResponseTest {
                 .setMaxTimestampMs(System.currentTimeMillis())
                 .setTokenId("token1")
                 .setHmac("test".getBytes());
-        return new CreateDelegationTokenResponse(data);
+        var response = new CreateDelegationTokenResponse(data);
+
+        String responseStr = response.toString();
+        assertTrue(responseStr.contains("tokenId='REDACTED'"));
+        assertTrue(responseStr.contains("hmac=[]"));
+        return response;
     }
 
     private RenewDelegationTokenRequest createRenewTokenRequest(short version) 
{
@@ -3156,7 +3172,14 @@ public class RequestResponseTest {
         tokenList.add(new DelegationToken(tokenInfo1, "test".getBytes()));
         tokenList.add(new DelegationToken(tokenInfo2, "test".getBytes()));
 
-        return new DescribeDelegationTokenResponse(version, 20, Errors.NONE, 
tokenList);
+        var response = new DescribeDelegationTokenResponse(version, 20, 
Errors.NONE, tokenList);
+
+        String responseStr = response.toString();
+        String[] parts = responseStr.split(",");
+        // The 2 token info should both be redacted
+        assertEquals(2, Arrays.stream(parts).filter(s -> 
s.trim().contains("tokenId='REDACTED'")).count());
+        assertEquals(2, Arrays.stream(parts).filter(s -> 
s.trim().contains("hmac=[]")).count());
+        return response;
     }
 
     private ElectLeadersRequest createElectLeadersRequestNullPartitions() {
@@ -3773,4 +3796,26 @@ public class RequestResponseTest {
                 parseRequest(SASL_AUTHENTICATE, 
SASL_AUTHENTICATE.latestVersion(), accessor.buffer())).getMessage();
         assertEquals("Error reading byte array of 32767 byte(s): only 3 
byte(s) available", msg);
     }
+
+    @Test
+    public void 
testSaslAuthenticateRequestResponseToStringMasksSensitiveData() {
+        byte[] sensitiveAuthBytes = 
"sensitive-auth-token-123".getBytes(StandardCharsets.UTF_8);
+        SaslAuthenticateRequestData requestData = new 
SaslAuthenticateRequestData().setAuthBytes(sensitiveAuthBytes);
+        SaslAuthenticateRequest request = new 
SaslAuthenticateRequest(requestData, (short) 2);
+
+        String requestString = request.toString();
+
+        // Verify that the authBytes field is present but empty in the output
+        assertTrue(requestString.contains("authBytes=[]"),
+                "authBytes field should be empty in toString() output");
+
+        SaslAuthenticateResponseData responseData = new 
SaslAuthenticateResponseData().setAuthBytes(sensitiveAuthBytes);
+        SaslAuthenticateResponse response = new 
SaslAuthenticateResponse(responseData);
+
+        String responseString = response.toString();
+
+        // Verify that the authBytes field is present but empty in the output
+        assertTrue(responseString.contains("authBytes=[]"),
+                "authBytes field should be empty in toString() output");
+    }
 }
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala 
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index ecea412e989..dacabd11763 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -47,6 +47,7 @@ import org.mockito.Mockito.mock
 import java.io.IOException
 import java.net.InetAddress
 import java.nio.ByteBuffer
+import java.util
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicReference
 import scala.collection.{Map, Seq}
@@ -65,13 +66,23 @@ class RequestChannelTest {
 
     val sensitiveValue = "secret"
     def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], 
expectedValues: Map[String, String]): Unit = {
-      val alterConfigs = request(new AlterConfigsRequest.Builder(
-          Collections.singletonMap(resource, new 
Config(entries.asJavaCollection)), true).build())
+      val alterConfigs = new AlterConfigsRequest.Builder(
+          util.Map.of(resource, new Config(entries.asJavaCollection)), 
true).build()
 
-      val loggableAlterConfigs = 
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
+      val alterConfigsString = alterConfigs.toString
+      entries.foreach { entry =>
+        if (!alterConfigsString.contains(entry.name())) {
+          fail("Config names should be in the request string")
+        }
+        if (entry.value() != null && 
alterConfigsString.contains(entry.value())) {
+          fail("Config values should not be in the request string")
+        }
+      }
+      val alterConfigsReq = request(alterConfigs)
+      val loggableAlterConfigs = 
alterConfigsReq.loggableRequest.asInstanceOf[AlterConfigsRequest]
       val loggedConfig = loggableAlterConfigs.configs.get(resource)
       assertEquals(expectedValues, toMap(loggedConfig))
-      val alterConfigsDesc = 
RequestConvertToJson.requestDesc(alterConfigs.header, 
alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString
+      val alterConfigsDesc = 
RequestConvertToJson.requestDesc(alterConfigsReq.header, 
alterConfigsReq.requestLog.toJava, alterConfigsReq.isForwarded).toString
       assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive 
config logged $alterConfigsDesc")
     }
 

Reply via email to