This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push: new 7e9d50a0c2e MINOR; Fix some Request toString methods (#19655) (#19690) 7e9d50a0c2e is described below commit 7e9d50a0c2e27fc53e5c719e5604cd6333f1474d Author: Alyssa Huang <ahu...@confluent.io> AuthorDate: Tue May 27 15:38:05 2025 -0700 MINOR; Fix some Request toString methods (#19655) (#19690) Reviewers: Colin P. McCabe <cmcc...@apache.org> ``` Conflicts: clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java - import statement clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java - import statement core/src/test/scala/unit/kafka/network/RequestChannelTest.scala - difference in unrelated parameter core/src/test/scala/unit/kafka/server/KafkaApisTest.scala - different logging and metadatacache instantiation ``` Cherry-Picked-From: 042be5b9ac3f4be83069b18f17fda382e07d6539 Cherry-Picked-By: Alyssa Huang <ahu...@confluent.io> Cherry-Picked-At: Mon May 12 11:11:19 2025 -0700 --- checkstyle/import-control.xml | 2 ++ .../kafka/common/requests/AbstractRequest.java | 2 +- .../requests/AlterUserScramCredentialsRequest.java | 16 ++++++++++++++++ .../requests/IncrementalAlterConfigsRequest.java | 16 ++++++++++++++++ .../unit/kafka/network/RequestChannelTest.scala | 16 +++++++++++++--- .../AlterUserScramCredentialsRequestTest.scala | 14 ++++++++++++-- .../test/scala/unit/kafka/server/KafkaApisTest.scala | 20 +++++++++++++++++--- 7 files changed, 77 insertions(+), 9 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a3b1c69796a..485c23fbb58 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -191,6 +191,8 @@ <allow pkg="io.opentelemetry.proto"/> <!-- for testing --> <allow pkg="org.apache.kafka.common.telemetry" /> + <!-- for IncrementalAlterConfigsRequest and AlterUserScramCredentialsRequest --> + <allow pkg="com.fasterxml.jackson.databind" /> </subpackage> <subpackage name="serialization"> diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 87a7a826869..0ecc734306f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -136,7 +136,7 @@ public abstract class AbstractRequest implements AbstractRequestResponse { } @Override - public final String toString() { + public String toString() { return toString(true); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java index 1ca7ea77aa4..c779f6d9a84 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java @@ -17,10 +17,14 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData; +import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter; import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + import java.nio.ByteBuffer; import java.util.List; import java.util.Set; @@ -82,4 +86,16 @@ public class AlterUserScramCredentialsRequest extends AbstractRequest { .collect(Collectors.toList()); return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results)); } + + // Do not print salt or saltedPassword + @Override + public String toString() { + JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy(); + + for (JsonNode upsertion : json.get("upsertions")) { + ((ObjectNode) upsertion).put("salt", ""); + ((ObjectNode) upsertion).put("saltedPassword", ""); + } + return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java index 222097502b2..54540837756 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java @@ -21,11 +21,15 @@ import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; +import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + import java.nio.ByteBuffer; import java.util.Collection; import java.util.Map; @@ -107,4 +111,16 @@ public class IncrementalAlterConfigsRequest extends AbstractRequest { } return new IncrementalAlterConfigsResponse(response); } + + // It is not safe to print all config values + @Override + public String toString() { + JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy(); + for (JsonNode resource : json.get("resources")) { + for (JsonNode config : resource.get("configs")) { + ((ObjectNode) config).put("value", "REDACTED"); + } + } + return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString(); + } } diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala index bddf03a136c..46bd1cf004a 100644 --- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala +++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala @@ -128,11 +128,21 @@ class RequestChannelTest { op: OpType, entries: Map[String, String], expectedValues: Map[String, String]): Unit = { - val alterConfigs = request(incrementalAlterConfigs(resource, entries, op)) - val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest] + val alterConfigs = incrementalAlterConfigs(resource, entries, op) + val alterConfigsString = alterConfigs.toString + entries.foreach { entry => + if (!alterConfigsString.contains(entry._1)) { + fail("Config names should be in the request string") + } + if (entry._2 != null && alterConfigsString.contains(entry._2)) { + fail("Config values should not be in the request string") + } + } + val req = request(alterConfigs) + val loggableAlterConfigs = req.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest] val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs assertEquals(expectedValues, toMap(loggedConfig)) - val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString + val alterConfigsDesc = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded).toString assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc") } diff --git a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala index feb914a2528..51fbb0005b7 100644 --- a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala @@ -263,6 +263,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { // create a bunch of credentials val request1_0 = new AlterUserScramCredentialsRequest.Builder( new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList( + new AlterUserScramCredentialsRequestData.ScramCredentialDeletion() + .setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`))) .setUpsertions(util.Arrays.asList( new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion() .setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`) @@ -270,10 +273,15 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { .setSalt(saltBytes) .setSaltedPassword(saltedPasswordBytes), ))).build() + assertEquals("AlterUserScramCredentialsRequestData(" + + "deletions=[ScramCredentialDeletion(name='" + user2 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` + ")], " + + "upsertions=[ScramCredentialUpsertion(name='" + user1 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` + + ", iterations=4096, salt=[], saltedPassword=[])])", request1_0.toString) val results1_0 = sendAlterUserScramCredentialsRequest(request1_0).data.results - assertEquals(1, results1_0.size) - checkNoErrorsAlteringCredentials(results1_0) + assertEquals(2, results1_0.size) + assertEquals(1, results1_0.asScala.count(_.errorCode == Errors.RESOURCE_NOT_FOUND.code())) checkUserAppearsInAlterResults(results1_0, user1) + checkUserAppearsInAlterResults(results1_0, user2) // When creating credentials, do not update the same user more than once per request val request1_1 = new AlterUserScramCredentialsRequest.Builder( @@ -295,6 +303,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { .setSalt(saltBytes) .setSaltedPassword(saltedPasswordBytes), ))).build() + assertFalse(request1_1.toString.contains(saltBytes)) + assertFalse(request1_1.toString.contains(saltedPasswordBytes)) val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results assertEquals(3, results1_1.size) checkNoErrorsAlteringCredentials(results1_1) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 25a06b71035..7f046c4a8ee 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -7137,7 +7137,12 @@ class KafkaApisTest extends Logging { @Test def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = { - val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort)) + val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort) + assertEquals( + "IncrementalAlterConfigsRequestData(resources=[], validateOnly=false)", + alterConfigsRequest.toString + ) + val request = buildRequest(alterConfigsRequest) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0) @@ -7149,7 +7154,7 @@ class KafkaApisTest extends Logging { @Test def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = { - val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(). + val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(). setValidateOnly(true). setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource(). setResourceName(brokerId.toString). @@ -7157,7 +7162,16 @@ class KafkaApisTest extends Logging { setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig(). setName(Log4jController.ROOT_LOGGER). setValue("TRACE")).iterator()))).iterator())), - 1.toShort)) + 1.toShort) + assertEquals( + "IncrementalAlterConfigsRequestData(resources=[" + + "AlterConfigsResource(resourceType=" + BROKER_LOGGER.id() + ", " + + "resourceName='"+ brokerId + "', " + + "configs=[AlterableConfig(name='" + Log4jController.ROOT_LOGGER + "', configOperation=0, value='REDACTED')])], " + + "validateOnly=true)", + alterConfigsRequest.toString + ) + val request = buildRequest(alterConfigsRequest) metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), any[Long])).thenReturn(0)