This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 18340c97337 KAFKA-17563 Move `RequestConvertToJson` to server module 
(#17223)
18340c97337 is described below

commit 18340c973378610cd5930974201088b6aca8a8ad
Author: xijiu <[email protected]>
AuthorDate: Fri Sep 27 02:19:47 2024 +0800

    KAFKA-17563 Move `RequestConvertToJson` to server module (#17223)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control-server.xml               |   4 +
 checkstyle/suppressions.xml                        |   1 +
 .../main/scala/kafka/network/RequestChannel.scala  |   4 +-
 .../scala/kafka/network/RequestConvertToJson.scala | 267 -------
 .../unit/kafka/network/RequestChannelTest.scala    |   6 +-
 .../kafka/network/RequestConvertToJsonTest.scala   |  94 +--
 .../unit/kafka/network/SocketServerTest.scala      |   1 +
 .../kafka/jmh/common/FetchRequestBenchmark.java    |   3 +-
 .../jmh/common/ListOffsetRequestBenchmark.java     |   3 +-
 .../kafka/jmh/common/ProduceRequestBenchmark.java  |   3 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |   6 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |   6 +-
 .../apache/kafka/network/RequestConvertToJson.java | 812 +++++++++++++++++++++
 .../kafka/network/RequestConvertToJsonTest.java    | 126 ++++
 14 files changed, 968 insertions(+), 368 deletions(-)

diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 22ab6a449e6..f046ceb74a6 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -99,4 +99,8 @@
     <allow pkg="org.apache.kafka.server.authorizer" />
   </subpackage>
 
+  <subpackage name="network">
+    <allow pkg="com.fasterxml.jackson" />
+  </subpackage>
+
 </import-control>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index b640b4ae7c5..14d0630d2e9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -50,6 +50,7 @@
     <!-- server tests -->
     <suppress checks="MethodLength|JavaNCSS|NPath" 
files="DescribeTopicPartitionsRequestHandlerTest.java"/>
     <suppress checks="CyclomaticComplexity" 
files="ListConsumerGroupTest.java"/>
+    <suppress 
checks="ClassFanOutComplexity|CyclomaticComplexity|MethodLength|ParameterNumber|JavaNCSS|ImportControl"
 files="RequestConvertToJson.java"/>
 
     <!-- Clients -->
     <suppress id="dontUseSystemExit"
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 9c9b2a63dfe..ca4ab90957b 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -36,8 +36,10 @@ import org.apache.kafka.network.Session
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.network.RequestConvertToJson
 
 import scala.jdk.CollectionConverters._
+import scala.compat.java8.OptionConverters._
 import scala.reflect.ClassTag
 
 object RequestChannel extends Logging {
@@ -249,7 +251,7 @@ object RequestChannel extends Logging {
       recordNetworkThreadTimeCallback.foreach(record => 
record(networkThreadTimeNanos))
 
       if (isRequestLoggingEnabled) {
-        val desc = RequestConvertToJson.requestDescMetrics(header, requestLog, 
response.responseLog,
+        val desc = RequestConvertToJson.requestDescMetrics(header, 
requestLog.asJava, response.responseLog.asJava,
           context, session, isForwarded,
           totalTimeMs, requestQueueTimeMs, apiLocalTimeMs,
           apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala 
b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
deleted file mode 100644
index a51894bd332..00000000000
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-import com.fasterxml.jackson.databind.JsonNode
-import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, 
JsonNodeFactory, LongNode, ObjectNode, TextNode}
-import org.apache.kafka.common.message._
-import org.apache.kafka.common.network.ClientInformation
-import org.apache.kafka.common.requests._
-import org.apache.kafka.network.Session
-
-object RequestConvertToJson {
-  def request(request: AbstractRequest): JsonNode = {
-    request match {
-      case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
-      case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
-      case req: AllocateProducerIdsRequest => 
AllocateProducerIdsRequestDataJsonConverter.write(req.data, request.version)
-      case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
-      case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
-      case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
-      case req: AlterPartitionRequest => 
AlterPartitionRequestDataJsonConverter.write(req.data, request.version)
-      case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
-      case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
-      case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
-      case req: AssignReplicasToDirsRequest => 
AssignReplicasToDirsRequestDataJsonConverter.write(req.data, request.version)
-      case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
-      case req: BrokerHeartbeatRequest => 
BrokerHeartbeatRequestDataJsonConverter.write(req.data, request.version)
-      case req: BrokerRegistrationRequest => 
BrokerRegistrationRequestDataJsonConverter.write(req.data, request.version)
-      case req: ConsumerGroupDescribeRequest => 
ConsumerGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
-      case req: ConsumerGroupHeartbeatRequest => 
ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
-      case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
-      case req: ControllerRegistrationRequest => 
ControllerRegistrationRequestDataJsonConverter.write(req.data, request.version)
-      case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data, request.version)
-      case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
-      case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
-      case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DeleteShareGroupStateRequest => 
DeleteShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
-      case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeClusterRequest => 
DescribeClusterRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeProducersRequest => 
DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
-      case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
-      case res: DescribeTopicPartitionsRequest => 
DescribeTopicPartitionsRequestDataJsonConverter.write(res.data, request.version)
-      case req: DescribeTransactionsRequest => 
DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
-      case res: DescribeUserScramCredentialsRequest => 
DescribeUserScramCredentialsRequestDataJsonConverter.write(res.data, 
request.version)
-      case req: ElectLeadersRequest => 
ElectLeadersRequestDataJsonConverter.write(req.data, request.version)
-      case req: EndQuorumEpochRequest => 
EndQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
-      case req: EndTxnRequest => 
EndTxnRequestDataJsonConverter.write(req.data, request.version)
-      case req: EnvelopeRequest => 
EnvelopeRequestDataJsonConverter.write(req.data, request.version)
-      case req: ExpireDelegationTokenRequest => 
ExpireDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
-      case req: FetchRequest => FetchRequestDataJsonConverter.write(req.data, 
request.version)
-      case req: FetchSnapshotRequest => 
FetchSnapshotRequestDataJsonConverter.write(req.data, request.version)
-      case req: FindCoordinatorRequest => 
FindCoordinatorRequestDataJsonConverter.write(req.data, request.version)
-      case req: GetTelemetrySubscriptionsRequest => 
GetTelemetrySubscriptionsRequestDataJsonConverter.write(req.data, 
request.version)
-      case req: HeartbeatRequest => 
HeartbeatRequestDataJsonConverter.write(req.data, request.version)
-      case req: IncrementalAlterConfigsRequest => 
IncrementalAlterConfigsRequestDataJsonConverter.write(req.data, request.version)
-      case req: InitializeShareGroupStateRequest => 
InitializeShareGroupStateRequestDataJsonConverter.write(req.data, 
request.version)
-      case req: InitProducerIdRequest => 
InitProducerIdRequestDataJsonConverter.write(req.data, request.version)
-      case req: JoinGroupRequest => 
JoinGroupRequestDataJsonConverter.write(req.data, request.version)
-      case req: LeaderAndIsrRequest => 
LeaderAndIsrRequestDataJsonConverter.write(req.data, request.version)
-      case req: LeaveGroupRequest => 
LeaveGroupRequestDataJsonConverter.write(req.data, request.version)
-      case req: ListClientMetricsResourcesRequest => 
ListClientMetricsResourcesRequestDataJsonConverter.write(req.data, 
request.version)
-      case req: ListGroupsRequest => 
ListGroupsRequestDataJsonConverter.write(req.data, request.version)
-      case req: ListOffsetsRequest => 
ListOffsetsRequestDataJsonConverter.write(req.data, request.version)
-      case req: ListPartitionReassignmentsRequest => 
ListPartitionReassignmentsRequestDataJsonConverter.write(req.data, 
request.version)
-      case req: ListTransactionsRequest => 
ListTransactionsRequestDataJsonConverter.write(req.data, request.version)
-      case req: MetadataRequest => 
MetadataRequestDataJsonConverter.write(req.data, request.version)
-      case req: OffsetCommitRequest => 
OffsetCommitRequestDataJsonConverter.write(req.data, request.version)
-      case req: OffsetDeleteRequest => 
OffsetDeleteRequestDataJsonConverter.write(req.data, request.version)
-      case req: OffsetFetchRequest => 
OffsetFetchRequestDataJsonConverter.write(req.data, request.version)
-      case req: OffsetsForLeaderEpochRequest => 
OffsetForLeaderEpochRequestDataJsonConverter.write(req.data, request.version)
-      case req: ProduceRequest => 
ProduceRequestDataJsonConverter.write(req.data, request.version, false)
-      case req: PushTelemetryRequest => 
PushTelemetryRequestDataJsonConverter.write(req.data, request.version)
-      case req: ReadShareGroupStateRequest => 
ReadShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
-      case req: ReadShareGroupStateSummaryRequest => 
ReadShareGroupStateSummaryRequestDataJsonConverter.write(req.data, 
request.version)
-      case req: RenewDelegationTokenRequest => 
RenewDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
-      case req: SaslAuthenticateRequest => 
SaslAuthenticateRequestDataJsonConverter.write(req.data, request.version)
-      case req: SaslHandshakeRequest => 
SaslHandshakeRequestDataJsonConverter.write(req.data, request.version)
-      case req: ShareAcknowledgeRequest => 
ShareAcknowledgeRequestDataJsonConverter.write(req.data, request.version)
-      case req: ShareFetchRequest => 
ShareFetchRequestDataJsonConverter.write(req.data, request.version)
-      case req: ShareGroupDescribeRequest => 
ShareGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
-      case req: ShareGroupHeartbeatRequest => 
ShareGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
-      case req: StopReplicaRequest => 
StopReplicaRequestDataJsonConverter.write(req.data, request.version)
-      case req: SyncGroupRequest => 
SyncGroupRequestDataJsonConverter.write(req.data, request.version)
-      case req: TxnOffsetCommitRequest => 
TxnOffsetCommitRequestDataJsonConverter.write(req.data, request.version)
-      case req: UnregisterBrokerRequest => 
UnregisterBrokerRequestDataJsonConverter.write(req.data, request.version)
-      case req: UpdateFeaturesRequest => 
UpdateFeaturesRequestDataJsonConverter.write(req.data, request.version)
-      case req: UpdateMetadataRequest => 
UpdateMetadataRequestDataJsonConverter.write(req.data, request.version)
-      case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data, 
request.version)
-      case req: WriteShareGroupStateRequest => 
WriteShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
-      case req: WriteTxnMarkersRequest => 
WriteTxnMarkersRequestDataJsonConverter.write(req.data, request.version)
-      case req: AddRaftVoterRequest => 
AddRaftVoterRequestDataJsonConverter.write(req.data, request.version)
-      case req: RemoveRaftVoterRequest => 
RemoveRaftVoterRequestDataJsonConverter.write(req.data, request.version)
-      case req: UpdateRaftVoterRequest => 
UpdateRaftVoterRequestDataJsonConverter.write(req.data, request.version)
-      case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is 
not currently handled in `request`, the " +
-        "code should be updated to do so.")
-    }
-  }
-
-  def response(response: AbstractResponse, version: Short): JsonNode = {
-    response match {
-      case res: AddOffsetsToTxnResponse => 
AddOffsetsToTxnResponseDataJsonConverter.write(res.data, version)
-      case res: AddPartitionsToTxnResponse => 
AddPartitionsToTxnResponseDataJsonConverter.write(res.data, version)
-      case res: AllocateProducerIdsResponse => 
AllocateProducerIdsResponseDataJsonConverter.write(res.data, version)
-      case res: AlterClientQuotasResponse => 
AlterClientQuotasResponseDataJsonConverter.write(res.data, version)
-      case res: AlterConfigsResponse => 
AlterConfigsResponseDataJsonConverter.write(res.data, version)
-      case res: AlterPartitionReassignmentsResponse => 
AlterPartitionReassignmentsResponseDataJsonConverter.write(res.data, version)
-      case res: AlterPartitionResponse => 
AlterPartitionResponseDataJsonConverter.write(res.data, version)
-      case res: AlterReplicaLogDirsResponse => 
AlterReplicaLogDirsResponseDataJsonConverter.write(res.data, version)
-      case res: AlterUserScramCredentialsResponse => 
AlterUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
-      case res: ApiVersionsResponse => 
ApiVersionsResponseDataJsonConverter.write(res.data, version)
-      case res: AssignReplicasToDirsResponse => 
AssignReplicasToDirsResponseDataJsonConverter.write(res.data, version)
-      case res: BeginQuorumEpochResponse => 
BeginQuorumEpochResponseDataJsonConverter.write(res.data, version)
-      case res: BrokerHeartbeatResponse => 
BrokerHeartbeatResponseDataJsonConverter.write(res.data, version)
-      case res: BrokerRegistrationResponse => 
BrokerRegistrationResponseDataJsonConverter.write(res.data, version)
-      case res: ConsumerGroupDescribeResponse => 
ConsumerGroupDescribeResponseDataJsonConverter.write(res.data, version)
-      case res: ConsumerGroupHeartbeatResponse => 
ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
-      case res: ControlledShutdownResponse => 
ControlledShutdownResponseDataJsonConverter.write(res.data, version)
-      case req: ControllerRegistrationResponse => 
ControllerRegistrationResponseDataJsonConverter.write(req.data, version)
-      case res: CreateAclsResponse => 
CreateAclsResponseDataJsonConverter.write(res.data, version)
-      case res: CreateDelegationTokenResponse => 
CreateDelegationTokenResponseDataJsonConverter.write(res.data, version)
-      case res: CreatePartitionsResponse => 
CreatePartitionsResponseDataJsonConverter.write(res.data, version)
-      case res: CreateTopicsResponse => 
CreateTopicsResponseDataJsonConverter.write(res.data, version)
-      case res: DeleteAclsResponse => 
DeleteAclsResponseDataJsonConverter.write(res.data, version)
-      case res: DeleteGroupsResponse => 
DeleteGroupsResponseDataJsonConverter.write(res.data, version)
-      case res: DeleteRecordsResponse => 
DeleteRecordsResponseDataJsonConverter.write(res.data, version)
-      case res: DeleteShareGroupStateResponse => 
DeleteShareGroupStateResponseDataJsonConverter.write(res.data, version)
-      case res: DeleteTopicsResponse => 
DeleteTopicsResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeAclsResponse => 
DescribeAclsResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeClientQuotasResponse => 
DescribeClientQuotasResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeClusterResponse => 
DescribeClusterResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeConfigsResponse => 
DescribeConfigsResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeDelegationTokenResponse => 
DescribeDelegationTokenResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeGroupsResponse => 
DescribeGroupsResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeLogDirsResponse => 
DescribeLogDirsResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeProducersResponse => 
DescribeProducersResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeQuorumResponse => 
DescribeQuorumResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeTopicPartitionsResponse => 
DescribeTopicPartitionsResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeTransactionsResponse => 
DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
-      case res: DescribeUserScramCredentialsResponse => 
DescribeUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
-      case res: ElectLeadersResponse => 
ElectLeadersResponseDataJsonConverter.write(res.data, version)
-      case res: EndQuorumEpochResponse => 
EndQuorumEpochResponseDataJsonConverter.write(res.data, version)
-      case res: EndTxnResponse => 
EndTxnResponseDataJsonConverter.write(res.data, version)
-      case res: EnvelopeResponse => 
EnvelopeResponseDataJsonConverter.write(res.data, version)
-      case res: ExpireDelegationTokenResponse => 
ExpireDelegationTokenResponseDataJsonConverter.write(res.data, version)
-      case res: FetchResponse => 
FetchResponseDataJsonConverter.write(res.data, version, false)
-      case res: FetchSnapshotResponse => 
FetchSnapshotResponseDataJsonConverter.write(res.data, version)
-      case res: FindCoordinatorResponse => 
FindCoordinatorResponseDataJsonConverter.write(res.data, version)
-      case res: GetTelemetrySubscriptionsResponse => 
GetTelemetrySubscriptionsResponseDataJsonConverter.write(res.data, version)
-      case res: HeartbeatResponse => 
HeartbeatResponseDataJsonConverter.write(res.data, version)
-      case res: IncrementalAlterConfigsResponse => 
IncrementalAlterConfigsResponseDataJsonConverter.write(res.data, version)
-      case res: InitializeShareGroupStateResponse => 
InitializeShareGroupStateResponseDataJsonConverter.write(res.data, version)
-      case res: InitProducerIdResponse => 
InitProducerIdResponseDataJsonConverter.write(res.data, version)
-      case res: JoinGroupResponse => 
JoinGroupResponseDataJsonConverter.write(res.data, version)
-      case res: LeaderAndIsrResponse => 
LeaderAndIsrResponseDataJsonConverter.write(res.data, version)
-      case res: LeaveGroupResponse => 
LeaveGroupResponseDataJsonConverter.write(res.data, version)
-      case res: ListClientMetricsResourcesResponse => 
ListClientMetricsResourcesResponseDataJsonConverter.write(res.data, version)
-      case res: ListGroupsResponse => 
ListGroupsResponseDataJsonConverter.write(res.data, version)
-      case res: ListOffsetsResponse => 
ListOffsetsResponseDataJsonConverter.write(res.data, version)
-      case res: ListPartitionReassignmentsResponse => 
ListPartitionReassignmentsResponseDataJsonConverter.write(res.data, version)
-      case res: ListTransactionsResponse => 
ListTransactionsResponseDataJsonConverter.write(res.data, version)
-      case res: MetadataResponse => 
MetadataResponseDataJsonConverter.write(res.data, version)
-      case res: OffsetCommitResponse => 
OffsetCommitResponseDataJsonConverter.write(res.data, version)
-      case res: OffsetDeleteResponse => 
OffsetDeleteResponseDataJsonConverter.write(res.data, version)
-      case res: OffsetFetchResponse => 
OffsetFetchResponseDataJsonConverter.write(res.data, version)
-      case res: OffsetsForLeaderEpochResponse => 
OffsetForLeaderEpochResponseDataJsonConverter.write(res.data, version)
-      case res: ProduceResponse => 
ProduceResponseDataJsonConverter.write(res.data, version)
-      case res: PushTelemetryResponse => 
PushTelemetryResponseDataJsonConverter.write(res.data, version)
-      case res: ReadShareGroupStateResponse => 
ReadShareGroupStateResponseDataJsonConverter.write(res.data, version)
-      case res: ReadShareGroupStateSummaryResponse => 
ReadShareGroupStateSummaryResponseDataJsonConverter.write(res.data, version)
-      case res: RenewDelegationTokenResponse => 
RenewDelegationTokenResponseDataJsonConverter.write(res.data, version)
-      case res: SaslAuthenticateResponse => 
SaslAuthenticateResponseDataJsonConverter.write(res.data, version)
-      case res: SaslHandshakeResponse => 
SaslHandshakeResponseDataJsonConverter.write(res.data, version)
-      case res: ShareAcknowledgeResponse => 
ShareAcknowledgeResponseDataJsonConverter.write(res.data, version)
-      case res: ShareFetchResponse => 
ShareFetchResponseDataJsonConverter.write(res.data, version)
-      case res: ShareGroupDescribeResponse => 
ShareGroupDescribeResponseDataJsonConverter.write(res.data, version)
-      case res: ShareGroupHeartbeatResponse => 
ShareGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
-      case res: StopReplicaResponse => 
StopReplicaResponseDataJsonConverter.write(res.data, version)
-      case res: SyncGroupResponse => 
SyncGroupResponseDataJsonConverter.write(res.data, version)
-      case res: TxnOffsetCommitResponse => 
TxnOffsetCommitResponseDataJsonConverter.write(res.data, version)
-      case res: UnregisterBrokerResponse => 
UnregisterBrokerResponseDataJsonConverter.write(res.data, version)
-      case res: UpdateFeaturesResponse => 
UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
-      case res: UpdateMetadataResponse => 
UpdateMetadataResponseDataJsonConverter.write(res.data, version)
-      case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data, 
version)
-      case res: WriteShareGroupStateResponse => 
WriteShareGroupStateResponseDataJsonConverter.write(res.data, version)
-      case res: WriteTxnMarkersResponse => 
WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
-      case res: AddRaftVoterResponse => 
AddRaftVoterResponseDataJsonConverter.write(res.data, version)
-      case res: RemoveRaftVoterResponse => 
RemoveRaftVoterResponseDataJsonConverter.write(res.data, version)
-      case res: UpdateRaftVoterResponse => 
UpdateRaftVoterResponseDataJsonConverter.write(res.data, version)
-      case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is 
not currently handled in `response`, the " +
-        "code should be updated to do so.")
-    }
-  }
-
-  def requestHeaderNode(header: RequestHeader): JsonNode = {
-    val node = RequestHeaderDataJsonConverter.write(header.data, 
header.headerVersion, false).asInstanceOf[ObjectNode]
-    node.set("requestApiKeyName", new TextNode(header.apiKey.toString))
-    if (header.apiKey().isVersionDeprecated(header.apiVersion()))
-      node.set("requestApiVersionDeprecated", BooleanNode.TRUE)
-    node
-  }
-
-  def requestDesc(header: RequestHeader, requestNode: Option[JsonNode], 
isForwarded: Boolean): JsonNode = {
-    val node = new ObjectNode(JsonNodeFactory.instance)
-    node.set("isForwarded", if (isForwarded) BooleanNode.TRUE else 
BooleanNode.FALSE)
-    node.set("requestHeader", requestHeaderNode(header))
-    node.set("request", requestNode.getOrElse(new TextNode("")))
-    node
-  }
-
-  def clientInfoNode(clientInfo: ClientInformation): JsonNode = {
-    val node = new ObjectNode(JsonNodeFactory.instance)
-    node.set("softwareName", new TextNode(clientInfo.softwareName))
-    node.set("softwareVersion", new TextNode(clientInfo.softwareVersion))
-    node
-  }
-
-  def requestDescMetrics(header: RequestHeader, requestNode: Option[JsonNode], 
responseNode: Option[JsonNode],
-                         context: RequestContext, session: Session, 
isForwarded: Boolean,
-                         totalTimeMs: Double, requestQueueTimeMs: Double, 
apiLocalTimeMs: Double,
-                         apiRemoteTimeMs: Double, apiThrottleTimeMs: Long, 
responseQueueTimeMs: Double,
-                         responseSendTimeMs: Double, temporaryMemoryBytes: 
Long,
-                         messageConversionsTimeMs: Double): JsonNode = {
-    val node = requestDesc(header, requestNode, 
isForwarded).asInstanceOf[ObjectNode]
-    node.set("response", responseNode.getOrElse(new TextNode("")))
-    node.set("connection", new TextNode(context.connectionId))
-    node.set("totalTimeMs", new DoubleNode(totalTimeMs))
-    node.set("requestQueueTimeMs", new DoubleNode(requestQueueTimeMs))
-    node.set("localTimeMs", new DoubleNode(apiLocalTimeMs))
-    node.set("remoteTimeMs", new DoubleNode(apiRemoteTimeMs))
-    node.set("throttleTimeMs", new LongNode(apiThrottleTimeMs))
-    node.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs))
-    node.set("sendTimeMs", new DoubleNode(responseSendTimeMs))
-    node.set("securityProtocol", new 
TextNode(context.securityProtocol.toString))
-    node.set("principal", new TextNode(session.principal.toString))
-    node.set("listener", new TextNode(context.listenerName.value))
-    node.set("clientInformation", clientInfoNode(context.clientInformation))
-    if (temporaryMemoryBytes > 0)
-      node.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes))
-    if (messageConversionsTimeMs > 0)
-      node.set("messageConversionsTime", new 
DoubleNode(messageConversionsTimeMs))
-    node
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala 
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 3aaa1458f33..d01d390813b 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.AlterConfigsRequest._
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.network.RequestConvertToJson
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.test
 import org.junit.jupiter.api.Assertions._
@@ -49,6 +50,7 @@ import java.nio.ByteBuffer
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicReference
 import scala.collection.{Map, Seq}
+import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 class RequestChannelTest {
@@ -69,7 +71,7 @@ class RequestChannelTest {
       val loggableAlterConfigs = 
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
       val loggedConfig = loggableAlterConfigs.configs.get(resource)
       assertEquals(expectedValues, toMap(loggedConfig))
-      val alterConfigsDesc = 
RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, 
alterConfigs.isForwarded).toString
+      val alterConfigsDesc = 
RequestConvertToJson.requestDesc(alterConfigs.header, 
alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
       assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive 
config logged $alterConfigsDesc")
     }
 
@@ -133,7 +135,7 @@ class RequestChannelTest {
       val loggableAlterConfigs = 
alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
       val loggedConfig = 
loggableAlterConfigs.data.resources.find(resource.`type`.id, 
resource.name).configs
       assertEquals(expectedValues, toMap(loggedConfig))
-      val alterConfigsDesc = 
RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, 
alterConfigs.isForwarded).toString
+      val alterConfigsDesc = 
RequestConvertToJson.requestDesc(alterConfigs.header, 
alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
       assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive 
config logged $alterConfigsDesc")
     }
 
diff --git 
a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala 
b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
index ac9870cc06a..bdc7da74ddd 100644
--- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
@@ -21,91 +21,22 @@ import java.net.InetAddress
 import java.nio.ByteBuffer
 import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode, 
JsonNodeFactory, LongNode, ObjectNode, TextNode}
 import kafka.network
-import kafka.network.RequestConvertToJson.requestHeaderNode
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.network.{ClientInformation, ListenerName, 
NetworkSend}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.network.RequestConvertToJson
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito.mock
 
 import java.util.Collections
-import scala.collection.mutable.ArrayBuffer
+import scala.compat.java8.OptionConverters._
 
 class RequestConvertToJsonTest {
 
-  @Test
-  def testAllRequestTypesHandled(): Unit = {
-    val unhandledKeys = ArrayBuffer[String]()
-    ApiKeys.values().foreach { key => {
-      val version: Short = key.latestVersion()
-      val message = key match {
-        case ApiKeys.DESCRIBE_ACLS =>
-          
ApiMessageType.fromApiKey(key.id).newRequest().asInstanceOf[DescribeAclsRequestData]
-            
.setPatternTypeFilter(1).setResourceTypeFilter(1).setPermissionType(1).setOperation(1)
-        case _ =>
-          ApiMessageType.fromApiKey(key.id).newRequest()
-      }
-
-      val bytes = MessageUtil.toByteBuffer(message, version)
-      val req = AbstractRequest.parseRequest(key, version, bytes).request
-      try {
-        RequestConvertToJson.request(req)
-      } catch {
-        case _ : IllegalStateException => unhandledKeys += key.toString
-      }
-    }}
-    assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled request keys")
-  }
-
-  @Test
-  def testAllApiVersionsResponseHandled(): Unit = {
-
-    ApiKeys.values().foreach { key => {
-      val unhandledVersions = ArrayBuffer[java.lang.Short]()
-      key.allVersions().forEach { version => {
-        val message = key match {
-          // Specify top-level error handling for verifying compatibility 
across versions
-          case ApiKeys.DESCRIBE_LOG_DIRS =>
-            
ApiMessageType.fromApiKey(key.id).newResponse().asInstanceOf[DescribeLogDirsResponseData]
-              .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
-          case _ =>
-            ApiMessageType.fromApiKey(key.id).newResponse()
-        }
-
-        val bytes = MessageUtil.toByteBuffer(message, version)
-        val response = AbstractResponse.parseResponse(key, bytes, version)
-        try {
-          RequestConvertToJson.response(response, version)
-        } catch {
-          case _ : IllegalStateException => unhandledVersions += version
-        }}
-      }
-      assertEquals(ArrayBuffer.empty, unhandledVersions, s"API: 
${key.toString} - Unhandled request versions")
-    }}
-  }
-
-  @Test
-  def testAllResponseTypesHandled(): Unit = {
-    val unhandledKeys = ArrayBuffer[String]()
-    ApiKeys.values().foreach { key => {
-      val version: Short = key.latestVersion()
-      val message = ApiMessageType.fromApiKey(key.id).newResponse()
-      val bytes = MessageUtil.toByteBuffer(message, version)
-      val res = AbstractResponse.parseResponse(key, bytes, version)
-      try {
-        RequestConvertToJson.response(res, version)
-      } catch {
-        case _ : IllegalStateException => unhandledKeys += key.toString
-      }
-    }}
-    assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled response keys")
-  }
-
   @Test
   def testRequestHeaderNode(): Unit = {
     val alterIsrRequest = new AlterPartitionRequest(new 
AlterPartitionRequestData(), 0)
@@ -135,19 +66,6 @@ class RequestConvertToJsonTest {
     assertEquals(expectedNode, actualNode)
   }
 
-  @Test
-  def testClientInfoNode(): Unit = {
-    val clientInfo = new ClientInformation("name", "1")
-
-    val expectedNode = new ObjectNode(JsonNodeFactory.instance)
-    expectedNode.set("softwareName", new TextNode(clientInfo.softwareName))
-    expectedNode.set("softwareVersion", new 
TextNode(clientInfo.softwareVersion))
-
-    val actualNode = RequestConvertToJson.clientInfoNode(clientInfo)
-
-    assertEquals(expectedNode, actualNode)
-  }
-
   @Test
   def testRequestDesc(): Unit = {
     val alterIsrRequest = new AlterPartitionRequest(new 
AlterPartitionRequestData(), 0)
@@ -155,10 +73,10 @@ class RequestConvertToJsonTest {
 
     val expectedNode = new ObjectNode(JsonNodeFactory.instance)
     expectedNode.set("isForwarded", if (req.isForwarded) BooleanNode.TRUE else 
BooleanNode.FALSE)
-    expectedNode.set("requestHeader", requestHeaderNode(req.header))
+    expectedNode.set("requestHeader", 
RequestConvertToJson.requestHeaderNode(req.header))
     expectedNode.set("request", req.requestLog.getOrElse(new TextNode("")))
 
-    val actualNode = RequestConvertToJson.requestDesc(req.header, 
req.requestLog, req.isForwarded)
+    val actualNode = RequestConvertToJson.requestDesc(req.header, 
req.requestLog.asJava, req.isForwarded)
 
     assertEquals(expectedNode, actualNode)
   }
@@ -181,7 +99,7 @@ class RequestConvertToJsonTest {
     val temporaryMemoryBytes = 8
     val messageConversionsTimeMs = 9
 
-    val expectedNode = RequestConvertToJson.requestDesc(req.header, 
req.requestLog, req.isForwarded).asInstanceOf[ObjectNode]
+    val expectedNode = RequestConvertToJson.requestDesc(req.header, 
req.requestLog.asJava, req.isForwarded).asInstanceOf[ObjectNode]
     expectedNode.set("response", res.responseLog.getOrElse(new TextNode("")))
     expectedNode.set("connection", new TextNode(req.context.connectionId))
     expectedNode.set("totalTimeMs", new DoubleNode(totalTimeMs))
@@ -198,7 +116,7 @@ class RequestConvertToJsonTest {
     expectedNode.set("temporaryMemoryBytes", new 
LongNode(temporaryMemoryBytes))
     expectedNode.set("messageConversionsTime", new 
DoubleNode(messageConversionsTimeMs))
 
-    val actualNode = RequestConvertToJson.requestDescMetrics(req.header, 
req.requestLog, res.responseLog, req.context, req.session, req.isForwarded,
+    val actualNode = RequestConvertToJson.requestDescMetrics(req.header, 
req.requestLog.asJava, res.responseLog.asJava, req.context, req.session, 
req.isForwarded,
       totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs, 
apiThrottleTimeMs, responseQueueTimeMs,
       responseSendTimeMs, temporaryMemoryBytes, 
messageConversionsTimeMs).asInstanceOf[ObjectNode]
 
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b7a1c7c32f0..2a34d2aea5f 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.utils._
+import org.apache.kafka.network.RequestConvertToJson
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.metrics.RequestMetrics
 import org.apache.kafka.security.CredentialProvider
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
index eecb619c0e8..f0fa2c32204 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.jmh.common;
 
-import kafka.network.RequestConvertToJson;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.network.Send;
@@ -27,6 +25,7 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ByteBufferChannel;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.network.RequestConvertToJson;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
index f5e8b3e4591..943c5baa145 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
@@ -17,13 +17,12 @@
 
 package org.apache.kafka.jmh.common;
 
-import kafka.network.RequestConvertToJson;
-
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.ListOffsetsRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.network.RequestConvertToJson;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
index 2fcafc5ada1..55ccee8516e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
@@ -17,11 +17,10 @@
 
 package org.apache.kafka.jmh.common;
 
-import kafka.network.RequestConvertToJson;
-
 import org.apache.kafka.common.message.ProduceRequestData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.network.RequestConvertToJson;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 7b2bd65a621..a56ff3400a7 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.metadata;
 
 import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
-import kafka.network.RequestConvertToJson;
 import kafka.server.AutoTopicCreationManager;
 import kafka.server.ClientQuotaManager;
 import kafka.server.ClientRequestQuotaManager;
@@ -58,6 +57,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.network.RequestConvertToJson;
 import org.apache.kafka.network.metrics.RequestChannelMetrics;
 import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.common.FinalizedFeatures;
@@ -237,7 +237,9 @@ public class KRaftMetadataRequestBenchmark {
 
     @Benchmark
     public String testRequestToJson() {
-        return 
RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), 
allTopicMetadataRequest.requestLog(), 
allTopicMetadataRequest.isForwarded()).toString();
+        Option<com.fasterxml.jackson.databind.JsonNode> option = 
allTopicMetadataRequest.requestLog();
+        Optional<com.fasterxml.jackson.databind.JsonNode> optional = 
option.isDefined() ? Optional.of(option.get()) : Optional.empty();
+        return 
RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, 
allTopicMetadataRequest.isForwarded()).toString();
     }
 
     @Benchmark
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index e7acdb5de41..eeebbfaa1a5 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.kafka.jmh.metadata;
 import kafka.controller.KafkaController;
 import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
-import kafka.network.RequestConvertToJson;
 import kafka.server.AutoTopicCreationManager;
 import kafka.server.BrokerFeatures;
 import kafka.server.ClientQuotaManager;
@@ -59,6 +58,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupCoordinator;
+import org.apache.kafka.network.RequestConvertToJson;
 import org.apache.kafka.network.metrics.RequestChannelMetrics;
 import org.apache.kafka.server.common.FinalizedFeatures;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -237,7 +237,9 @@ public class MetadataRequestBenchmark {
 
     @Benchmark
     public String testRequestToJson() {
-        return 
RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), 
allTopicMetadataRequest.requestLog(), 
allTopicMetadataRequest.isForwarded()).toString();
+        Option<com.fasterxml.jackson.databind.JsonNode> option = 
allTopicMetadataRequest.requestLog();
+        Optional<com.fasterxml.jackson.databind.JsonNode> optional = 
option.isDefined() ? Optional.of(option.get()) : Optional.empty();
+        return 
RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional, 
allTopicMetadataRequest.isForwarded()).toString();
     }
 
     @Benchmark
diff --git 
a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java 
b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
new file mode 100644
index 00000000000..ac744ef7bac
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.AddOffsetsToTxnResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseDataJsonConverter;
+import org.apache.kafka.common.message.AddRaftVoterRequestDataJsonConverter;
+import org.apache.kafka.common.message.AddRaftVoterResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.AllocateProducerIdsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.AllocateProducerIdsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.AlterClientQuotasRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.AlterClientQuotasResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterConfigsRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterConfigsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterPartitionRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterPartitionResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.AlterUserScramCredentialsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ApiVersionsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ApiVersionsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.AssignReplicasToDirsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.BeginQuorumEpochRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.BeginQuorumEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.BrokerHeartbeatResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.BrokerRegistrationRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.BrokerRegistrationResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ConsumerGroupDescribeRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ControlledShutdownRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ControlledShutdownResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ControllerRegistrationRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ControllerRegistrationResponseDataJsonConverter;
+import org.apache.kafka.common.message.CreateAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.CreateAclsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.CreateDelegationTokenResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.CreatePartitionsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.CreatePartitionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.CreateTopicsRequestDataJsonConverter;
+import org.apache.kafka.common.message.CreateTopicsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteAclsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteRecordsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteRecordsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.DeleteShareGroupStateRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DeleteShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteTopicsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteTopicsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeAclsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeClientQuotasRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeClientQuotasResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeClusterRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeClusterResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeConfigsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeConfigsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeDelegationTokenRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeLogDirsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeLogDirsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeProducersRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeProducersResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeQuorumRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeQuorumResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeTransactionsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeTransactionsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ElectLeadersRequestDataJsonConverter;
+import org.apache.kafka.common.message.ElectLeadersResponseDataJsonConverter;
+import org.apache.kafka.common.message.EndQuorumEpochRequestDataJsonConverter;
+import org.apache.kafka.common.message.EndQuorumEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.EndTxnRequestDataJsonConverter;
+import org.apache.kafka.common.message.EndTxnResponseDataJsonConverter;
+import org.apache.kafka.common.message.EnvelopeRequestDataJsonConverter;
+import org.apache.kafka.common.message.EnvelopeResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ExpireDelegationTokenRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ExpireDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.FetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.FetchResponseDataJsonConverter;
+import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter;
+import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter;
+import org.apache.kafka.common.message.FindCoordinatorRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.FindCoordinatorResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.HeartbeatRequestDataJsonConverter;
+import org.apache.kafka.common.message.HeartbeatResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseDataJsonConverter;
+import org.apache.kafka.common.message.InitProducerIdRequestDataJsonConverter;
+import org.apache.kafka.common.message.InitProducerIdResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.InitializeShareGroupStateRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.InitializeShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.JoinGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.JoinGroupResponseDataJsonConverter;
+import org.apache.kafka.common.message.LeaderAndIsrRequestDataJsonConverter;
+import org.apache.kafka.common.message.LeaderAndIsrResponseDataJsonConverter;
+import org.apache.kafka.common.message.LeaveGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.LeaveGroupResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ListClientMetricsResourcesRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ListClientMetricsResourcesResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListOffsetsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListOffsetsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ListPartitionReassignmentsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ListPartitionReassignmentsResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ListTransactionsRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ListTransactionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.MetadataRequestDataJsonConverter;
+import org.apache.kafka.common.message.MetadataResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetCommitRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetCommitResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetDeleteRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetDeleteResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetFetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetFetchResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.ProduceRequestDataJsonConverter;
+import org.apache.kafka.common.message.ProduceResponseDataJsonConverter;
+import org.apache.kafka.common.message.PushTelemetryRequestDataJsonConverter;
+import org.apache.kafka.common.message.PushTelemetryResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ReadShareGroupStateRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ReadShareGroupStateResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseDataJsonConverter;
+import org.apache.kafka.common.message.RemoveRaftVoterRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.RemoveRaftVoterResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.RenewDelegationTokenRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.RenewDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.RequestHeaderDataJsonConverter;
+import 
org.apache.kafka.common.message.SaslAuthenticateRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.SaslAuthenticateResponseDataJsonConverter;
+import org.apache.kafka.common.message.SaslHandshakeRequestDataJsonConverter;
+import org.apache.kafka.common.message.SaslHandshakeResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ShareAcknowledgeRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ShareAcknowledgeResponseDataJsonConverter;
+import org.apache.kafka.common.message.ShareFetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.ShareFetchResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ShareGroupDescribeRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ShareGroupDescribeResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.ShareGroupHeartbeatRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.ShareGroupHeartbeatResponseDataJsonConverter;
+import org.apache.kafka.common.message.StopReplicaRequestDataJsonConverter;
+import org.apache.kafka.common.message.StopReplicaResponseDataJsonConverter;
+import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.TxnOffsetCommitResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.UnregisterBrokerRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.UnregisterBrokerResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateFeaturesRequestDataJsonConverter;
+import org.apache.kafka.common.message.UpdateFeaturesResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateMetadataRequestDataJsonConverter;
+import org.apache.kafka.common.message.UpdateMetadataResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.UpdateRaftVoterResponseDataJsonConverter;
+import org.apache.kafka.common.message.VoteRequestDataJsonConverter;
+import org.apache.kafka.common.message.VoteResponseDataJsonConverter;
+import 
org.apache.kafka.common.message.WriteShareGroupStateRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.WriteShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestDataJsonConverter;
+import 
org.apache.kafka.common.message.WriteTxnMarkersResponseDataJsonConverter;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.AddRaftVoterRequest;
+import org.apache.kafka.common.requests.AddRaftVoterResponse;
+import org.apache.kafka.common.requests.AllocateProducerIdsRequest;
+import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
+import org.apache.kafka.common.requests.AlterClientQuotasRequest;
+import org.apache.kafka.common.requests.AlterClientQuotasResponse;
+import org.apache.kafka.common.requests.AlterConfigsRequest;
+import org.apache.kafka.common.requests.AlterConfigsResponse;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
+import org.apache.kafka.common.requests.AlterPartitionResponse;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
+import org.apache.kafka.common.requests.AlterUserScramCredentialsRequest;
+import org.apache.kafka.common.requests.AlterUserScramCredentialsResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.BrokerHeartbeatRequest;
+import org.apache.kafka.common.requests.BrokerHeartbeatResponse;
+import org.apache.kafka.common.requests.BrokerRegistrationRequest;
+import org.apache.kafka.common.requests.BrokerRegistrationResponse;
+import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
+import org.apache.kafka.common.requests.ConsumerGroupDescribeResponse;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.ControlledShutdownRequest;
+import org.apache.kafka.common.requests.ControlledShutdownResponse;
+import org.apache.kafka.common.requests.ControllerRegistrationRequest;
+import org.apache.kafka.common.requests.ControllerRegistrationResponse;
+import org.apache.kafka.common.requests.CreateAclsRequest;
+import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteAclsRequest;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.DescribeAclsRequest;
+import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeClientQuotasRequest;
+import org.apache.kafka.common.requests.DescribeClientQuotasResponse;
+import org.apache.kafka.common.requests.DescribeClusterRequest;
+import org.apache.kafka.common.requests.DescribeClusterResponse;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.DescribeLogDirsRequest;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.DescribeProducersRequest;
+import org.apache.kafka.common.requests.DescribeProducersResponse;
+import org.apache.kafka.common.requests.DescribeQuorumRequest;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.DescribeUserScramCredentialsRequest;
+import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
+import org.apache.kafka.common.requests.ElectLeadersRequest;
+import org.apache.kafka.common.requests.ElectLeadersResponse;
+import org.apache.kafka.common.requests.EndQuorumEpochRequest;
+import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.FetchSnapshotResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateRequest;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaderAndIsrRequest;
+import org.apache.kafka.common.requests.LeaderAndIsrResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
+import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListGroupsRequest;
+import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
+import org.apache.kafka.common.requests.ListTransactionsRequest;
+import org.apache.kafka.common.requests.ListTransactionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetDeleteRequest;
+import org.apache.kafka.common.requests.OffsetDeleteResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
+import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
+import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.SaslAuthenticateRequest;
+import org.apache.kafka.common.requests.SaslAuthenticateResponse;
+import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.requests.ShareAcknowledgeRequest;
+import org.apache.kafka.common.requests.ShareAcknowledgeResponse;
+import org.apache.kafka.common.requests.ShareFetchRequest;
+import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
+import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
+import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.StopReplicaRequest;
+import org.apache.kafka.common.requests.StopReplicaResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+import org.apache.kafka.common.requests.UnregisterBrokerRequest;
+import org.apache.kafka.common.requests.UnregisterBrokerResponse;
+import org.apache.kafka.common.requests.UpdateFeaturesRequest;
+import org.apache.kafka.common.requests.UpdateFeaturesResponse;
+import org.apache.kafka.common.requests.UpdateMetadataRequest;
+import org.apache.kafka.common.requests.UpdateMetadataResponse;
+import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
+import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
+import org.apache.kafka.common.requests.VoteRequest;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
+import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.util.Optional;
+
+public class RequestConvertToJson {
+
+    public static JsonNode request(AbstractRequest request) {
+        switch (request.apiKey()) {
+            case ADD_OFFSETS_TO_TXN:
+                return 
AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest) 
request).data(), request.version());
+            case ADD_PARTITIONS_TO_TXN:
+                return 
AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest) 
request).data(), request.version());
+            case ALLOCATE_PRODUCER_IDS:
+                return 
AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest) 
request).data(), request.version());
+            case ALTER_CLIENT_QUOTAS:
+                return 
AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest) 
request).data(), request.version());
+            case ALTER_CONFIGS:
+                return 
AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest) 
request).data(), request.version());
+            case ALTER_PARTITION_REASSIGNMENTS:
+                return 
AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest)
 request).data(), request.version());
+            case ALTER_PARTITION:
+                return 
AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest) 
request).data(), request.version());
+            case ALTER_REPLICA_LOG_DIRS:
+                return 
AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest) 
request).data(), request.version());
+            case ALTER_USER_SCRAM_CREDENTIALS:
+                return 
AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest)
 request).data(), request.version());
+            case API_VERSIONS:
+                return 
ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest) 
request).data(), request.version());
+            case ASSIGN_REPLICAS_TO_DIRS:
+                return 
AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest)
 request).data(), request.version());
+            case BEGIN_QUORUM_EPOCH:
+                return 
BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest) 
request).data(), request.version());
+            case BROKER_HEARTBEAT:
+                return 
BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest) 
request).data(), request.version());
+            case BROKER_REGISTRATION:
+                return 
BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest) 
request).data(), request.version());
+            case CONSUMER_GROUP_DESCRIBE:
+                return 
ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest)
 request).data(), request.version());
+            case CONSUMER_GROUP_HEARTBEAT:
+                return 
ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest)
 request).data(), request.version());
+            case CONTROLLED_SHUTDOWN:
+                return 
ControlledShutdownRequestDataJsonConverter.write(((ControlledShutdownRequest) 
request).data(), request.version());
+            case CONTROLLER_REGISTRATION:
+                return 
ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest)
 request).data(), request.version());
+            case CREATE_ACLS:
+                return 
CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(), 
request.version());
+            case CREATE_DELEGATION_TOKEN:
+                return 
CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest)
 request).data(), request.version());
+            case CREATE_PARTITIONS:
+                return 
CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest) 
request).data(), request.version());
+            case CREATE_TOPICS:
+                return 
CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest) 
request).data(), request.version());
+            case DELETE_ACLS:
+                return 
DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(), 
request.version());
+            case DELETE_GROUPS:
+                return 
DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest) 
request).data(), request.version());
+            case DELETE_RECORDS:
+                return 
DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest) 
request).data(), request.version());
+            case DELETE_SHARE_GROUP_STATE:
+                return 
DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest)
 request).data(), request.version());
+            case DELETE_TOPICS:
+                return 
DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest) 
request).data(), request.version());
+            case DESCRIBE_ACLS:
+                return 
DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest) 
request).data(), request.version());
+            case DESCRIBE_CLIENT_QUOTAS:
+                return 
DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest)
 request).data(), request.version());
+            case DESCRIBE_CLUSTER:
+                return 
DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest) 
request).data(), request.version());
+            case DESCRIBE_CONFIGS:
+                return 
DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest) 
request).data(), request.version());
+            case DESCRIBE_DELEGATION_TOKEN:
+                return 
DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest)
 request).data(), request.version());
+            case DESCRIBE_GROUPS:
+                return 
DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest) 
request).data(), request.version());
+            case DESCRIBE_LOG_DIRS:
+                return 
DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest) 
request).data(), request.version());
+            case DESCRIBE_PRODUCERS:
+                return 
DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest) 
request).data(), request.version());
+            case DESCRIBE_QUORUM:
+                return 
DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest) 
request).data(), request.version());
+            case DESCRIBE_TOPIC_PARTITIONS:
+                return 
DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest)
 request).data(), request.version());
+            case DESCRIBE_TRANSACTIONS:
+                return 
DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest)
 request).data(), request.version());
+            case DESCRIBE_USER_SCRAM_CREDENTIALS:
+                return 
DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest)
 request).data(), request.version());
+            case ELECT_LEADERS:
+                return 
ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest) 
request).data(), request.version());
+            case END_QUORUM_EPOCH:
+                return 
EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest) 
request).data(), request.version());
+            case END_TXN:
+                return EndTxnRequestDataJsonConverter.write(((EndTxnRequest) 
request).data(), request.version());
+            case ENVELOPE:
+                return 
EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(), 
request.version());
+            case EXPIRE_DELEGATION_TOKEN:
+                return 
ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest)
 request).data(), request.version());
+            case FETCH:
+                return FetchRequestDataJsonConverter.write(((FetchRequest) 
request).data(), request.version());
+            case FETCH_SNAPSHOT:
+                return 
FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest) 
request).data(), request.version());
+            case FIND_COORDINATOR:
+                return 
FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest) 
request).data(), request.version());
+            case GET_TELEMETRY_SUBSCRIPTIONS:
+                return 
GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest)
 request).data(), request.version());
+            case HEARTBEAT:
+                return 
HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(), 
request.version());
+            case INCREMENTAL_ALTER_CONFIGS:
+                return 
IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest)
 request).data(), request.version());
+            case INITIALIZE_SHARE_GROUP_STATE:
+                return 
InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest)
 request).data(), request.version());
+            case INIT_PRODUCER_ID:
+                return 
InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest) 
request).data(), request.version());
+            case JOIN_GROUP:
+                return 
JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(), 
request.version());
+            case LEADER_AND_ISR:
+                return 
LeaderAndIsrRequestDataJsonConverter.write(((LeaderAndIsrRequest) 
request).data(), request.version());
+            case LEAVE_GROUP:
+                return 
LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(), 
request.version());
+            case LIST_CLIENT_METRICS_RESOURCES:
+                return 
ListClientMetricsResourcesRequestDataJsonConverter.write(((ListClientMetricsResourcesRequest)
 request).data(), request.version());
+            case LIST_GROUPS:
+                return 
ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(), 
request.version());
+            case LIST_OFFSETS:
+                return 
ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest) 
request).data(), request.version());
+            case LIST_PARTITION_REASSIGNMENTS:
+                return 
ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest)
 request).data(), request.version());
+            case LIST_TRANSACTIONS:
+                return 
ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest) 
request).data(), request.version());
+            case METADATA:
+                return 
MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(), 
request.version());
+            case OFFSET_COMMIT:
+                return 
OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest) 
request).data(), request.version());
+            case OFFSET_DELETE:
+                return 
OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest) 
request).data(), request.version());
+            case OFFSET_FETCH:
+                return 
OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest) 
request).data(), request.version());
+            case OFFSET_FOR_LEADER_EPOCH:
+                return 
OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest)
 request).data(), request.version());
+            case PRODUCE:
+                return ProduceRequestDataJsonConverter.write(((ProduceRequest) 
request).data(), request.version(), false);
+            case PUSH_TELEMETRY:
+                return 
PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest) 
request).data(), request.version());
+            case READ_SHARE_GROUP_STATE:
+                return 
ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest) 
request).data(), request.version());
+            case READ_SHARE_GROUP_STATE_SUMMARY:
+                return 
ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest)
 request).data(), request.version());
+            case RENEW_DELEGATION_TOKEN:
+                return 
RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest)
 request).data(), request.version());
+            case SASL_AUTHENTICATE:
+                return 
SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest) 
request).data(), request.version());
+            case SASL_HANDSHAKE:
+                return 
SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest) 
request).data(), request.version());
+            case SHARE_ACKNOWLEDGE:
+                return 
ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest) 
request).data(), request.version());
+            case SHARE_FETCH:
+                return 
ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(), 
request.version());
+            case SHARE_GROUP_DESCRIBE:
+                return 
ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest) 
request).data(), request.version());
+            case SHARE_GROUP_HEARTBEAT:
+                return 
ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest) 
request).data(), request.version());
+            case STOP_REPLICA:
+                return 
StopReplicaRequestDataJsonConverter.write(((StopReplicaRequest) 
request).data(), request.version());
+            case SYNC_GROUP:
+                return 
SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(), 
request.version());
+            case TXN_OFFSET_COMMIT:
+                return 
TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest) 
request).data(), request.version());
+            case UNREGISTER_BROKER:
+                return 
UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest) 
request).data(), request.version());
+            case UPDATE_FEATURES:
+                return 
UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest) 
request).data(), request.version());
+            case UPDATE_METADATA:
+                return 
UpdateMetadataRequestDataJsonConverter.write(((UpdateMetadataRequest) 
request).data(), request.version());
+            case VOTE:
+                return VoteRequestDataJsonConverter.write(((VoteRequest) 
request).data(), request.version());
+            case WRITE_SHARE_GROUP_STATE:
+                return 
WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest)
 request).data(), request.version());
+            case WRITE_TXN_MARKERS:
+                return 
WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest) 
request).data(), request.version());
+            case ADD_RAFT_VOTER:
+                return 
AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest) 
request).data(), request.version());
+            case REMOVE_RAFT_VOTER:
+                return 
RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest) 
request).data(), request.version());
+            case UPDATE_RAFT_VOTER:
+                return 
UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest) 
request).data(), request.version());
+            default:
+                throw new IllegalStateException("ApiKey " + request.apiKey() + 
" is not currently handled in `request`, the " +
+                    "code should be updated to do so.");
+        }
+    }
+
+    public static JsonNode response(AbstractResponse response, short version) {
+        switch (response.apiKey()) {
+            case ADD_OFFSETS_TO_TXN:
+                return 
AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse) 
response).data(), version);
+            case ADD_PARTITIONS_TO_TXN:
+                return 
AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse) 
response).data(), version);
+            case ALLOCATE_PRODUCER_IDS:
+                return 
AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse)
 response).data(), version);
+            case ALTER_CLIENT_QUOTAS:
+                return 
AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse) 
response).data(), version);
+            case ALTER_CONFIGS:
+                return 
AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse) 
response).data(), version);
+            case ALTER_PARTITION_REASSIGNMENTS:
+                return 
AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse)
 response).data(), version);
+            case ALTER_PARTITION:
+                return 
AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse) 
response).data(), version);
+            case ALTER_REPLICA_LOG_DIRS:
+                return 
AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse)
 response).data(), version);
+            case ALTER_USER_SCRAM_CREDENTIALS:
+                return 
AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse)
 response).data(), version);
+            case API_VERSIONS:
+                return 
ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse) 
response).data(), version);
+            case ASSIGN_REPLICAS_TO_DIRS:
+                return 
AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse)
 response).data(), version);
+            case BEGIN_QUORUM_EPOCH:
+                return 
BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse) 
response).data(), version);
+            case BROKER_HEARTBEAT:
+                return 
BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse) 
response).data(), version);
+            case BROKER_REGISTRATION:
+                return 
BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse) 
response).data(), version);
+            case CONSUMER_GROUP_DESCRIBE:
+                return 
ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse)
 response).data(), version);
+            case CONSUMER_GROUP_HEARTBEAT:
+                return 
ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse)
 response).data(), version);
+            case CONTROLLED_SHUTDOWN:
+                return 
ControlledShutdownResponseDataJsonConverter.write(((ControlledShutdownResponse) 
response).data(), version);
+            case CONTROLLER_REGISTRATION:
+                return 
ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse)
 response).data(), version);
+            case CREATE_ACLS:
+                return 
CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse) 
response).data(), version);
+            case CREATE_DELEGATION_TOKEN:
+                return 
CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse)
 response).data(), version);
+            case CREATE_PARTITIONS:
+                return 
CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse) 
response).data(), version);
+            case CREATE_TOPICS:
+                return 
CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse) 
response).data(), version);
+            case DELETE_ACLS:
+                return 
DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse) 
response).data(), version);
+            case DELETE_GROUPS:
+                return 
DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse) 
response).data(), version);
+            case DELETE_RECORDS:
+                return 
DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse) 
response).data(), version);
+            case DELETE_SHARE_GROUP_STATE:
+                return 
DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse)
 response).data(), version);
+            case DELETE_TOPICS:
+                return 
DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse) 
response).data(), version);
+            case DESCRIBE_ACLS:
+                return 
DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse) 
response).data(), version);
+            case DESCRIBE_CLIENT_QUOTAS:
+                return 
DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse)
 response).data(), version);
+            case DESCRIBE_CLUSTER:
+                return 
DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse) 
response).data(), version);
+            case DESCRIBE_CONFIGS:
+                return 
DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse) 
response).data(), version);
+            case DESCRIBE_DELEGATION_TOKEN:
+                return 
DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse)
 response).data(), version);
+            case DESCRIBE_GROUPS:
+                return 
DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse) 
response).data(), version);
+            case DESCRIBE_LOG_DIRS:
+                return 
DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse) 
response).data(), version);
+            case DESCRIBE_PRODUCERS:
+                return 
DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse) 
response).data(), version);
+            case DESCRIBE_QUORUM:
+                return 
DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse) 
response).data(), version);
+            case DESCRIBE_TOPIC_PARTITIONS:
+                return 
DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse)
 response).data(), version);
+            case DESCRIBE_TRANSACTIONS:
+                return 
DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse)
 response).data(), version);
+            case DESCRIBE_USER_SCRAM_CREDENTIALS:
+                return 
DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse)
 response).data(), version);
+            case ELECT_LEADERS:
+                return 
ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse) 
response).data(), version);
+            case END_QUORUM_EPOCH:
+                return 
EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse) 
response).data(), version);
+            case END_TXN:
+                return EndTxnResponseDataJsonConverter.write(((EndTxnResponse) 
response).data(), version);
+            case ENVELOPE:
+                return 
EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(), 
version);
+            case EXPIRE_DELEGATION_TOKEN:
+                return 
ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse)
 response).data(), version);
+            case FETCH:
+                return FetchResponseDataJsonConverter.write(((FetchResponse) 
response).data(), version, false);
+            case FETCH_SNAPSHOT:
+                return 
FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse) 
response).data(), version);
+            case FIND_COORDINATOR:
+                return 
FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse) 
response).data(), version);
+            case GET_TELEMETRY_SUBSCRIPTIONS:
+                return 
GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse)
 response).data(), version);
+            case HEARTBEAT:
+                return 
HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(), 
version);
+            case INCREMENTAL_ALTER_CONFIGS:
+                return 
IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse)
 response).data(), version);
+            case INITIALIZE_SHARE_GROUP_STATE:
+                return 
InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse)
 response).data(), version);
+            case INIT_PRODUCER_ID:
+                return 
InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse) 
response).data(), version);
+            case JOIN_GROUP:
+                return 
JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(), 
version);
+            case LEADER_AND_ISR:
+                return 
LeaderAndIsrResponseDataJsonConverter.write(((LeaderAndIsrResponse) 
response).data(), version);
+            case LEAVE_GROUP:
+                return 
LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse) 
response).data(), version);
+            case LIST_CLIENT_METRICS_RESOURCES:
+                return 
ListClientMetricsResourcesResponseDataJsonConverter.write(((ListClientMetricsResourcesResponse)
 response).data(), version);
+            case LIST_GROUPS:
+                return 
ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse) 
response).data(), version);
+            case LIST_OFFSETS:
+                return 
ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse) 
response).data(), version);
+            case LIST_PARTITION_REASSIGNMENTS:
+                return 
ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse)
 response).data(), version);
+            case LIST_TRANSACTIONS:
+                return 
ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse) 
response).data(), version);
+            case METADATA:
+                return 
MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(), 
version);
+            case OFFSET_COMMIT:
+                return 
OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse) 
response).data(), version);
+            case OFFSET_DELETE:
+                return 
OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse) 
response).data(), version);
+            case OFFSET_FETCH:
+                return 
OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse) 
response).data(), version);
+            case OFFSET_FOR_LEADER_EPOCH:
+                return 
OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse)
 response).data(), version);
+            case PRODUCE:
+                return 
ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(), 
version);
+            case PUSH_TELEMETRY:
+                return 
PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse) 
response).data(), version);
+            case READ_SHARE_GROUP_STATE:
+                return 
ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse)
 response).data(), version);
+            case READ_SHARE_GROUP_STATE_SUMMARY:
+                return 
ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse)
 response).data(), version);
+            case RENEW_DELEGATION_TOKEN:
+                return 
RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse)
 response).data(), version);
+            case SASL_AUTHENTICATE:
+                return 
SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse) 
response).data(), version);
+            case SASL_HANDSHAKE:
+                return 
SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse) 
response).data(), version);
+            case SHARE_ACKNOWLEDGE:
+                return 
ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse) 
response).data(), version);
+            case SHARE_FETCH:
+                return 
ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse) 
response).data(), version);
+            case SHARE_GROUP_DESCRIBE:
+                return 
ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse) 
response).data(), version);
+            case SHARE_GROUP_HEARTBEAT:
+                return 
ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse)
 response).data(), version);
+            case STOP_REPLICA:
+                return 
StopReplicaResponseDataJsonConverter.write(((StopReplicaResponse) 
response).data(), version);
+            case SYNC_GROUP:
+                return 
SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(), 
version);
+            case TXN_OFFSET_COMMIT:
+                return 
TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse) 
response).data(), version);
+            case UNREGISTER_BROKER:
+                return 
UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse) 
response).data(), version);
+            case UPDATE_FEATURES:
+                return 
UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse) 
response).data(), version);
+            case UPDATE_METADATA:
+                return 
UpdateMetadataResponseDataJsonConverter.write(((UpdateMetadataResponse) 
response).data(), version);
+            case VOTE:
+                return VoteResponseDataJsonConverter.write(((VoteResponse) 
response).data(), version);
+            case WRITE_SHARE_GROUP_STATE:
+                return 
WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse)
 response).data(), version);
+            case WRITE_TXN_MARKERS:
+                return 
WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse) 
response).data(), version);
+            case ADD_RAFT_VOTER:
+                return 
AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse) 
response).data(), version);
+            case REMOVE_RAFT_VOTER:
+                return 
RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse) 
response).data(), version);
+            case UPDATE_RAFT_VOTER:
+                return 
UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse) 
response).data(), version);
+            default:
+                throw new IllegalStateException("ApiKey " + response.apiKey() 
+ " is not currently handled in `response`, the " +
+                    "code should be updated to do so.");
+        }
+    }
+
+    public static JsonNode requestHeaderNode(RequestHeader header) {
+        ObjectNode node = (ObjectNode) RequestHeaderDataJsonConverter.write(
+            header.data(), header.headerVersion(), false
+        );
+        node.set("requestApiKeyName", new 
TextNode(header.apiKey().toString()));
+        if (header.apiKey().isVersionDeprecated(header.apiVersion())) {
+            node.set("requestApiVersionDeprecated", BooleanNode.TRUE);
+        }
+        return node;
+    }
+
+    public static JsonNode requestDesc(RequestHeader header, 
Optional<JsonNode> requestNode, boolean isForwarded) {
+        ObjectNode node = JsonNodeFactory.instance.objectNode();
+        node.set("isForwarded", isForwarded ? BooleanNode.TRUE : 
BooleanNode.FALSE);
+        node.set("requestHeader", requestHeaderNode(header));
+        node.set("request", requestNode.orElse(new TextNode("")));
+        return node;
+    }
+
+    public static JsonNode clientInfoNode(ClientInformation clientInfo) {
+        ObjectNode node = JsonNodeFactory.instance.objectNode();
+        node.set("softwareName", new TextNode(clientInfo.softwareName()));
+        node.set("softwareVersion", new 
TextNode(clientInfo.softwareVersion()));
+        return node;
+    }
+
+    public static JsonNode requestDescMetrics(RequestHeader header, 
Optional<JsonNode> requestNode, Optional<JsonNode> responseNode,
+                                              RequestContext context, Session 
session, boolean isForwarded,
+                                              double totalTimeMs, double 
requestQueueTimeMs, double apiLocalTimeMs,
+                                              double apiRemoteTimeMs, long 
apiThrottleTimeMs, double responseQueueTimeMs,
+                                              double responseSendTimeMs, long 
temporaryMemoryBytes,
+                                              double messageConversionsTimeMs) 
{
+        ObjectNode node = (ObjectNode) requestDesc(header, requestNode, 
isForwarded);
+        node.set("response", responseNode.orElse(new TextNode("")));
+        node.set("connection", new TextNode(context.connectionId));
+        node.set("totalTimeMs", new DoubleNode(totalTimeMs));
+        node.set("requestQueueTimeMs", new DoubleNode(requestQueueTimeMs));
+        node.set("localTimeMs", new DoubleNode(apiLocalTimeMs));
+        node.set("remoteTimeMs", new DoubleNode(apiRemoteTimeMs));
+        node.set("throttleTimeMs", new LongNode(apiThrottleTimeMs));
+        node.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs));
+        node.set("sendTimeMs", new DoubleNode(responseSendTimeMs));
+        node.set("securityProtocol", new 
TextNode(context.securityProtocol.toString()));
+        node.set("principal", new TextNode(session.principal.toString()));
+        node.set("listener", new TextNode(context.listenerName.value()));
+        node.set("clientInformation", 
clientInfoNode(context.clientInformation));
+        if (temporaryMemoryBytes > 0) {
+            node.set("temporaryMemoryBytes", new 
LongNode(temporaryMemoryBytes));
+        }
+        if (messageConversionsTimeMs > 0) {
+            node.set("messageConversionsTime", new 
DoubleNode(messageConversionsTimeMs));
+        }
+        return node;
+    }
+}
diff --git 
a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java 
b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
new file mode 100644
index 00000000000..56c5a822432
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.DescribeAclsRequestData;
+import org.apache.kafka.common.message.DescribeLogDirsResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RequestConvertToJsonTest {
+
+    @Test
+    public void testAllRequestTypesHandled() {
+        List<String> unhandledKeys = new ArrayList<>();
+        for (ApiKeys key : ApiKeys.values()) {
+            short version = key.latestVersion();
+            ApiMessage message;
+            if (key == ApiKeys.DESCRIBE_ACLS) {
+                message = ApiMessageType.fromApiKey(key.id).newRequest();
+                DescribeAclsRequestData requestData = 
(DescribeAclsRequestData) message;
+                requestData.setPatternTypeFilter((byte) 1);
+                requestData.setResourceTypeFilter((byte) 1);
+                requestData.setPermissionType((byte) 1);
+                requestData.setOperation((byte) 1);
+            } else {
+                message = ApiMessageType.fromApiKey(key.id).newRequest();
+            }
+            ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+            AbstractRequest req = AbstractRequest.parseRequest(key, version, 
bytes).request;
+            try {
+                RequestConvertToJson.request(req);
+            } catch (IllegalStateException e) {
+                unhandledKeys.add(key.toString());
+            }
+        }
+        assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled 
request keys");
+    }
+
+    @Test
+    public void testAllApiVersionsResponseHandled() {
+        for (ApiKeys key : ApiKeys.values()) {
+            List<Short> unhandledVersions = new ArrayList<>();
+            for (short version : key.allVersions()) {
+                ApiMessage message;
+                // Specify top-level error handling for verifying 
compatibility across versions
+                if (key == ApiKeys.DESCRIBE_LOG_DIRS) {
+                    message = ApiMessageType.fromApiKey(key.id).newResponse();
+                    DescribeLogDirsResponseData responseData = 
(DescribeLogDirsResponseData) message;
+                    
responseData.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+                } else {
+                    message = ApiMessageType.fromApiKey(key.id).newResponse();
+                }
+
+                ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+                AbstractResponse response = 
AbstractResponse.parseResponse(key, bytes, version);
+                try {
+                    RequestConvertToJson.response(response, version);
+                } catch (IllegalStateException e) {
+                    unhandledVersions.add(version);
+                }
+            }
+            assertEquals(new ArrayList<>(), unhandledVersions, "API: " + key + 
" - Unhandled request versions");
+        }
+    }
+
+    @Test
+    public void testAllResponseTypesHandled() {
+        List<String> unhandledKeys = new ArrayList<>();
+        for (ApiKeys key : ApiKeys.values()) {
+            short version = key.latestVersion();
+            ApiMessage message = 
ApiMessageType.fromApiKey(key.id).newResponse();
+            ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+            AbstractResponse res = AbstractResponse.parseResponse(key, bytes, 
version);
+            try {
+                RequestConvertToJson.response(res, version);
+            } catch (IllegalStateException e) {
+                unhandledKeys.add(key.toString());
+            }
+        }
+        assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled 
response keys");
+    }
+
+    @Test
+    public void testClientInfoNode() {
+        ClientInformation clientInfo = new ClientInformation("name", "1");
+        ObjectNode expectedNode = JsonNodeFactory.instance.objectNode();
+        expectedNode.set("softwareName", new 
TextNode(clientInfo.softwareName()));
+        expectedNode.set("softwareVersion", new 
TextNode(clientInfo.softwareVersion()));
+        JsonNode actualNode = RequestConvertToJson.clientInfoNode(clientInfo);
+        assertEquals(expectedNode, actualNode);
+    }
+}

Reply via email to