This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 599e4a90205 MINOR: Improve some requests/responses toString method
(#20759)
599e4a90205 is described below
commit 599e4a9020585666215a38639c0d51e905bdc706
Author: Luke Chen <[email protected]>
AuthorDate: Sat Oct 25 22:57:15 2025 +0800
MINOR: Improve some requests/responses toString method (#20759)
Improve some requests/responses toString method to log only the
required
info, including the request.Builder toString methods.
1. AlterConfigsRequest
2. AlterUserScramCredentialsRequest
3. ExpireDelegationTokenRequest
4. IncrementalAlterConfigsRequest
5. RenewDelegationTokenRequest
6. SaslAuthenticateRequest
7. createDelegationTokenResponse
8. describeDelegationTokenResponse
9. SaslAuthenticateResponse
Reviewers: Chia-Ping Tsai <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../kafka/common/requests/AlterConfigsRequest.java | 21 +++++++++
.../requests/AlterUserScramCredentialsRequest.java | 23 +++++----
.../requests/CreateDelegationTokenResponse.java | 9 ++++
.../requests/DescribeDelegationTokenResponse.java | 11 +++++
.../requests/ExpireDelegationTokenRequest.java | 14 +++++-
.../requests/IncrementalAlterConfigsRequest.java | 24 +++++-----
.../requests/RenewDelegationTokenRequest.java | 14 +++++-
.../common/requests/SaslAuthenticateRequest.java | 8 ++++
.../common/requests/SaslAuthenticateResponse.java | 8 ++++
.../kafka/common/requests/RequestResponseTest.java | 55 ++++++++++++++++++++--
.../unit/kafka/network/RequestChannelTest.scala | 18 +++++--
11 files changed, 171 insertions(+), 34 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index f36fe3bf6fb..83b3fe5d549 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -87,6 +87,11 @@ public class AlterConfigsRequest extends AbstractRequest {
public AlterConfigsRequest build(short version) {
return new AlterConfigsRequest(data, version);
}
+
+ @Override
+ public String toString() {
+ return maskData(data);
+ }
}
private final AlterConfigsRequestData data;
@@ -134,4 +139,20 @@ public class AlterConfigsRequest extends AbstractRequest {
public static AlterConfigsRequest parse(Readable readable, short version) {
return new AlterConfigsRequest(new AlterConfigsRequestData(readable,
version), version);
}
+
+ // It is not safe to print all config values
+ private static String maskData(AlterConfigsRequestData data) {
+ AlterConfigsRequestData tempData = data.duplicate();
+ tempData.resources().forEach(resource -> {
+ resource.configs().forEach(config -> {
+ config.setValue("REDACTED");
+ });
+ });
+ return tempData.toString();
+ }
+
+ @Override
+ public String toString() {
+ return maskData(data);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
index 7f2991cfad7..5fea5e2c5cb 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
@@ -17,14 +17,10 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
-import
org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Readable;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -47,7 +43,7 @@ public class AlterUserScramCredentialsRequest extends
AbstractRequest {
@Override
public String toString() {
- return data.toString();
+ return maskData(data);
}
}
@@ -86,15 +82,18 @@ public class AlterUserScramCredentialsRequest extends
AbstractRequest {
return new AlterUserScramCredentialsResponse(new
AlterUserScramCredentialsResponseData().setResults(results));
}
+ private static String maskData(AlterUserScramCredentialsRequestData data) {
+ AlterUserScramCredentialsRequestData tempData = data.duplicate();
+ tempData.upsertions().forEach(upsertion -> {
+ upsertion.setSalt(new byte[0]);
+ upsertion.setSaltedPassword(new byte[0]);
+ });
+ return tempData.toString();
+ }
+
// Do not print salt or saltedPassword
@Override
public String toString() {
- JsonNode json =
AlterUserScramCredentialsRequestDataJsonConverter.write(data,
version()).deepCopy();
-
- for (JsonNode upsertion : json.get("upsertions")) {
- ((ObjectNode) upsertion).put("salt", "");
- ((ObjectNode) upsertion).put("saltedPassword", "");
- }
- return AlterUserScramCredentialsRequestDataJsonConverter.read(json,
version()).toString();
+ return maskData(data);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
index ce577d48d97..445749610d8 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -103,4 +103,13 @@ public class CreateDelegationTokenResponse extends
AbstractResponse {
public boolean shouldClientThrottle(short version) {
return version >= 1;
}
+
+ // Do not print tokenId and Hmac, overwrite a temp copy of the data with
empty content
+ @Override
+ public String toString() {
+ CreateDelegationTokenResponseData tempData = data.duplicate();
+ tempData.setTokenId("REDACTED");
+ tempData.setHmac(new byte[0]);
+ return tempData.toString();
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index d0476a3772c..38a0e7dd660 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -127,4 +127,15 @@ public class DescribeDelegationTokenResponse extends
AbstractResponse {
public boolean shouldClientThrottle(short version) {
return version >= 1;
}
+
+ // Do not print tokenId and Hmac, overwrite a temp copy of the data with
empty content
+ @Override
+ public String toString() {
+ DescribeDelegationTokenResponseData tempData = data.duplicate();
+ tempData.tokens().forEach(token -> {
+ token.setTokenId("REDACTED");
+ token.setHmac(new byte[0]);
+ });
+ return tempData.toString();
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 37f10f61f3d..58859368caf 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -74,7 +74,19 @@ public class ExpireDelegationTokenRequest extends
AbstractRequest {
@Override
public String toString() {
- return data.toString();
+ return maskData(data);
}
}
+
+ private static String maskData(ExpireDelegationTokenRequestData data) {
+ ExpireDelegationTokenRequestData tempData = data.duplicate();
+ tempData.setHmac(new byte[0]);
+ return tempData.toString();
+ }
+
+ // Do not print Hmac, overwrite a temp copy of the data with empty content
+ @Override
+ public String toString() {
+ return maskData(data);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
index 59cf8f2f138..889680fefc7 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
@@ -21,15 +21,11 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
-import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Readable;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
import java.util.Collection;
import java.util.Map;
@@ -76,7 +72,7 @@ public class IncrementalAlterConfigsRequest extends
AbstractRequest {
@Override
public String toString() {
- return data.toString();
+ return maskData(data);
}
}
@@ -112,14 +108,18 @@ public class IncrementalAlterConfigsRequest extends
AbstractRequest {
}
// It is not safe to print all config values
+ private static String maskData(IncrementalAlterConfigsRequestData data) {
+ IncrementalAlterConfigsRequestData tempData = data.duplicate();
+ tempData.resources().forEach(resource -> {
+ resource.configs().forEach(config -> {
+ config.setValue("REDACTED");
+ });
+ });
+ return tempData.toString();
+ }
+
@Override
public String toString() {
- JsonNode json =
IncrementalAlterConfigsRequestDataJsonConverter.write(data,
version()).deepCopy();
- for (JsonNode resource : json.get("resources")) {
- for (JsonNode config : resource.get("configs")) {
- ((ObjectNode) config).put("value", "REDACTED");
- }
- }
- return IncrementalAlterConfigsRequestDataJsonConverter.read(json,
version()).toString();
+ return maskData(data);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index 17af0a3c271..178c438eabf 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -64,7 +64,19 @@ public class RenewDelegationTokenRequest extends
AbstractRequest {
@Override
public String toString() {
- return data.toString();
+ return maskData(data);
}
}
+
+ private static String maskData(RenewDelegationTokenRequestData data) {
+ RenewDelegationTokenRequestData tempData = data.duplicate();
+ tempData.setHmac(new byte[0]);
+ return tempData.toString();
+ }
+
+ // Do not print Hmac, overwrite a temp copy of the data with empty content
+ @Override
+ public String toString() {
+ return maskData(data);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
index 610b0845338..0d9e8b08b36 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -76,4 +76,12 @@ public class SaslAuthenticateRequest extends AbstractRequest
{
return new SaslAuthenticateRequest(new
SaslAuthenticateRequestData(readable, version),
version);
}
+
+ // Do not print authBytes, overwrite a temp copy of the data with empty
bytes
+ @Override
+ public String toString() {
+ SaslAuthenticateRequestData tempData = data.duplicate();
+ tempData.setAuthBytes(new byte[0]);
+ return tempData.toString();
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index ba0fc29a391..12d16ee4184 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -79,4 +79,12 @@ public class SaslAuthenticateResponse extends
AbstractResponse {
public static SaslAuthenticateResponse parse(Readable readable, short
version) {
return new SaslAuthenticateResponse(new
SaslAuthenticateResponseData(readable, version));
}
+
+ // Do not print authBytes, overwrite a temp copy of the data with empty
bytes
+ @Override
+ public String toString() {
+ SaslAuthenticateResponseData tempData = data.duplicate();
+ tempData.setAuthBytes(new byte[0]);
+ return tempData.toString();
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 957544e2f73..676e2558fb8 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -297,6 +297,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;
+import static org.apache.kafka.common.protocol.ApiKeys.CREATE_DELEGATION_TOKEN;
import static org.apache.kafka.common.protocol.ApiKeys.CREATE_PARTITIONS;
import static org.apache.kafka.common.protocol.ApiKeys.CREATE_TOPICS;
import static org.apache.kafka.common.protocol.ApiKeys.DELETE_ACLS;
@@ -3077,7 +3078,7 @@ public class RequestResponseTest {
}
private AlterConfigsRequest createAlterConfigsRequest(short version) {
- Map<ConfigResource, AlterConfigsRequest.Config> configs = new
HashMap<>();
+ Map<ConfigResource, AlterConfigsRequest.Config> configs = new
LinkedHashMap<>();
List<AlterConfigsRequest.ConfigEntry> configEntries = asList(
new AlterConfigsRequest.ConfigEntry("config_name",
"config_value"),
new AlterConfigsRequest.ConfigEntry("another_name", "another
value")
@@ -3085,7 +3086,19 @@ public class RequestResponseTest {
configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new
AlterConfigsRequest.Config(configEntries));
configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
new AlterConfigsRequest.Config(emptyList()));
- return new AlterConfigsRequest.Builder(configs, false).build(version);
+ AlterConfigsRequest alterConfigsRequest = new
AlterConfigsRequest.Builder(configs, false).build(version);
+ assertEquals(
+ "AlterConfigsRequestData(resources=[" +
+ "AlterConfigsResource(resourceType=" +
ConfigResource.Type.BROKER.id() + ", " +
+ "resourceName='0', " +
+ "configs=[AlterableConfig(name='config_name',
value='REDACTED'), " +
+ "AlterableConfig(name='another_name',
value='REDACTED')]), " +
+ "AlterConfigsResource(resourceType=" +
ConfigResource.Type.TOPIC.id() + ", " +
+ "resourceName='topic', configs=[])], " +
+ "validateOnly=false)",
+ alterConfigsRequest.toString()
+ );
+ return alterConfigsRequest;
}
private AlterConfigsResponse createAlterConfigsResponse() {
@@ -3188,7 +3201,12 @@ public class RequestResponseTest {
.setMaxTimestampMs(System.currentTimeMillis())
.setTokenId("token1")
.setHmac("test".getBytes());
- return new CreateDelegationTokenResponse(data);
+ var response = new CreateDelegationTokenResponse(data);
+
+ String responseStr = response.toString();
+ assertTrue(responseStr.contains("tokenId='REDACTED'"));
+ assertTrue(responseStr.contains("hmac=[]"));
+ return response;
}
private RenewDelegationTokenRequest createRenewTokenRequest(short version)
{
@@ -3244,7 +3262,14 @@ public class RequestResponseTest {
tokenList.add(new DelegationToken(tokenInfo1, "test".getBytes()));
tokenList.add(new DelegationToken(tokenInfo2, "test".getBytes()));
- return new DescribeDelegationTokenResponse(version, 20, Errors.NONE,
tokenList);
+ var response = new DescribeDelegationTokenResponse(version, 20,
Errors.NONE, tokenList);
+
+ String responseStr = response.toString();
+ String[] parts = responseStr.split(",");
+ // The 2 token info should both be redacted
+ assertEquals(2, Arrays.stream(parts).filter(s ->
s.trim().contains("tokenId='REDACTED'")).count());
+ assertEquals(2, Arrays.stream(parts).filter(s ->
s.trim().contains("hmac=[]")).count());
+ return response;
}
private ElectLeadersRequest createElectLeadersRequestNullPartitions() {
@@ -3964,6 +3989,28 @@ public class RequestResponseTest {
assertEquals("Error reading byte array of 32767 byte(s): only 3
byte(s) available", msg);
}
+ @Test
+ public void
testSaslAuthenticateRequestResponseToStringMasksSensitiveData() {
+ byte[] sensitiveAuthBytes =
"sensitive-auth-token-123".getBytes(StandardCharsets.UTF_8);
+ SaslAuthenticateRequestData requestData = new
SaslAuthenticateRequestData().setAuthBytes(sensitiveAuthBytes);
+ SaslAuthenticateRequest request = new
SaslAuthenticateRequest(requestData, (short) 2);
+
+ String requestString = request.toString();
+
+ // Verify that the authBytes field is present but empty in the output
+ assertTrue(requestString.contains("authBytes=[]"),
+ "authBytes field should be empty in toString() output");
+
+ SaslAuthenticateResponseData responseData = new
SaslAuthenticateResponseData().setAuthBytes(sensitiveAuthBytes);
+ SaslAuthenticateResponse response = new
SaslAuthenticateResponse(responseData);
+
+ String responseString = response.toString();
+
+ // Verify that the authBytes field is present but empty in the output
+ assertTrue(responseString.contains("authBytes=[]"),
+ "authBytes field should be empty in toString() output");
+ }
+
@Test
public void
testListConfigResourcesRequestV0FailsWithConfigResourceTypeOtherThanClientMetrics()
{
// One type which is not CLIENT_METRICS
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 8dbfa808d7f..df10430ad72 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -64,13 +64,23 @@ class RequestChannelTest {
val sensitiveValue = "secret"
def verifyConfig(resource: ConfigResource, entries:
util.List[ConfigEntry], expectedValues: Map[String, String]): Unit = {
- val alterConfigs = request(new AlterConfigsRequest.Builder(
- util.Map.of(resource, new Config(entries)), true).build())
+ val alterConfigs = new AlterConfigsRequest.Builder(
+ util.Map.of(resource, new Config(entries)), true).build()
- val loggableAlterConfigs =
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
+ val alterConfigsString = alterConfigs.toString
+ entries.forEach { entry =>
+ if (!alterConfigsString.contains(entry.name())) {
+ fail("Config names should be in the request string")
+ }
+ if (entry.value() != null &&
alterConfigsString.contains(entry.value())) {
+ fail("Config values should not be in the request string")
+ }
+ }
+ val alterConfigsReq = request(alterConfigs)
+ val loggableAlterConfigs =
alterConfigsReq.loggableRequest.asInstanceOf[AlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.configs.get(resource)
assertEquals(expectedValues, toMap(loggedConfig))
- val alterConfigsDesc =
RequestConvertToJson.requestDesc(alterConfigs.header,
alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString
+ val alterConfigsDesc =
RequestConvertToJson.requestDesc(alterConfigsReq.header,
alterConfigsReq.requestLog.toJava, alterConfigsReq.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive
config logged $alterConfigsDesc")
}