This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 18340c97337 KAFKA-17563 Move `RequestConvertToJson` to server module
(#17223)
18340c97337 is described below
commit 18340c973378610cd5930974201088b6aca8a8ad
Author: xijiu <[email protected]>
AuthorDate: Fri Sep 27 02:19:47 2024 +0800
KAFKA-17563 Move `RequestConvertToJson` to server module (#17223)
Reviewers: Chia-Ping Tsai <[email protected]>
---
checkstyle/import-control-server.xml | 4 +
checkstyle/suppressions.xml | 1 +
.../main/scala/kafka/network/RequestChannel.scala | 4 +-
.../scala/kafka/network/RequestConvertToJson.scala | 267 -------
.../unit/kafka/network/RequestChannelTest.scala | 6 +-
.../kafka/network/RequestConvertToJsonTest.scala | 94 +--
.../unit/kafka/network/SocketServerTest.scala | 1 +
.../kafka/jmh/common/FetchRequestBenchmark.java | 3 +-
.../jmh/common/ListOffsetRequestBenchmark.java | 3 +-
.../kafka/jmh/common/ProduceRequestBenchmark.java | 3 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 6 +-
.../jmh/metadata/MetadataRequestBenchmark.java | 6 +-
.../apache/kafka/network/RequestConvertToJson.java | 812 +++++++++++++++++++++
.../kafka/network/RequestConvertToJsonTest.java | 126 ++++
14 files changed, 968 insertions(+), 368 deletions(-)
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index 22ab6a449e6..f046ceb74a6 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -99,4 +99,8 @@
<allow pkg="org.apache.kafka.server.authorizer" />
</subpackage>
+ <subpackage name="network">
+ <allow pkg="com.fasterxml.jackson" />
+ </subpackage>
+
</import-control>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index b640b4ae7c5..14d0630d2e9 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -50,6 +50,7 @@
<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath"
files="DescribeTopicPartitionsRequestHandlerTest.java"/>
<suppress checks="CyclomaticComplexity"
files="ListConsumerGroupTest.java"/>
+ <suppress
checks="ClassFanOutComplexity|CyclomaticComplexity|MethodLength|ParameterNumber|JavaNCSS|ImportControl"
files="RequestConvertToJson.java"/>
<!-- Clients -->
<suppress id="dontUseSystemExit"
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 9c9b2a63dfe..ca4ab90957b 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -36,8 +36,10 @@ import org.apache.kafka.network.Session
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.network.RequestConvertToJson
import scala.jdk.CollectionConverters._
+import scala.compat.java8.OptionConverters._
import scala.reflect.ClassTag
object RequestChannel extends Logging {
@@ -249,7 +251,7 @@ object RequestChannel extends Logging {
recordNetworkThreadTimeCallback.foreach(record =>
record(networkThreadTimeNanos))
if (isRequestLoggingEnabled) {
- val desc = RequestConvertToJson.requestDescMetrics(header, requestLog,
response.responseLog,
+ val desc = RequestConvertToJson.requestDescMetrics(header,
requestLog.asJava, response.responseLog.asJava,
context, session, isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs,
apiRemoteTimeMs, apiThrottleTimeMs, responseQueueTimeMs,
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
deleted file mode 100644
index a51894bd332..00000000000
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-import com.fasterxml.jackson.databind.JsonNode
-import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode,
JsonNodeFactory, LongNode, ObjectNode, TextNode}
-import org.apache.kafka.common.message._
-import org.apache.kafka.common.network.ClientInformation
-import org.apache.kafka.common.requests._
-import org.apache.kafka.network.Session
-
-object RequestConvertToJson {
- def request(request: AbstractRequest): JsonNode = {
- request match {
- case req: AddOffsetsToTxnRequest =>
AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version)
- case req: AddPartitionsToTxnRequest =>
AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version)
- case req: AllocateProducerIdsRequest =>
AllocateProducerIdsRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterClientQuotasRequest =>
AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterConfigsRequest =>
AlterConfigsRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterPartitionReassignmentsRequest =>
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data,
request.version)
- case req: AlterPartitionRequest =>
AlterPartitionRequestDataJsonConverter.write(req.data, request.version)
- case req: AlterReplicaLogDirsRequest =>
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data, request.version)
- case res: AlterUserScramCredentialsRequest =>
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data,
request.version)
- case req: ApiVersionsRequest =>
ApiVersionsRequestDataJsonConverter.write(req.data, request.version)
- case req: AssignReplicasToDirsRequest =>
AssignReplicasToDirsRequestDataJsonConverter.write(req.data, request.version)
- case req: BeginQuorumEpochRequest =>
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
- case req: BrokerHeartbeatRequest =>
BrokerHeartbeatRequestDataJsonConverter.write(req.data, request.version)
- case req: BrokerRegistrationRequest =>
BrokerRegistrationRequestDataJsonConverter.write(req.data, request.version)
- case req: ConsumerGroupDescribeRequest =>
ConsumerGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
- case req: ConsumerGroupHeartbeatRequest =>
ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
- case req: ControlledShutdownRequest =>
ControlledShutdownRequestDataJsonConverter.write(req.data, request.version)
- case req: ControllerRegistrationRequest =>
ControllerRegistrationRequestDataJsonConverter.write(req.data, request.version)
- case req: CreateAclsRequest =>
CreateAclsRequestDataJsonConverter.write(req.data, request.version)
- case req: CreateDelegationTokenRequest =>
CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
- case req: CreatePartitionsRequest =>
CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
- case req: CreateTopicsRequest =>
CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteAclsRequest =>
DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteGroupsRequest =>
DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteRecordsRequest =>
DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteShareGroupStateRequest =>
DeleteShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
- case req: DeleteTopicsRequest =>
DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeAclsRequest =>
DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeClientQuotasRequest =>
DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeClusterRequest =>
DescribeClusterRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeConfigsRequest =>
DescribeConfigsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeDelegationTokenRequest =>
DescribeDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeGroupsRequest =>
DescribeGroupsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeLogDirsRequest =>
DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeProducersRequest =>
DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
- case req: DescribeQuorumRequest =>
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
- case res: DescribeTopicPartitionsRequest =>
DescribeTopicPartitionsRequestDataJsonConverter.write(res.data, request.version)
- case req: DescribeTransactionsRequest =>
DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
- case res: DescribeUserScramCredentialsRequest =>
DescribeUserScramCredentialsRequestDataJsonConverter.write(res.data,
request.version)
- case req: ElectLeadersRequest =>
ElectLeadersRequestDataJsonConverter.write(req.data, request.version)
- case req: EndQuorumEpochRequest =>
EndQuorumEpochRequestDataJsonConverter.write(req.data, request.version)
- case req: EndTxnRequest =>
EndTxnRequestDataJsonConverter.write(req.data, request.version)
- case req: EnvelopeRequest =>
EnvelopeRequestDataJsonConverter.write(req.data, request.version)
- case req: ExpireDelegationTokenRequest =>
ExpireDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
- case req: FetchRequest => FetchRequestDataJsonConverter.write(req.data,
request.version)
- case req: FetchSnapshotRequest =>
FetchSnapshotRequestDataJsonConverter.write(req.data, request.version)
- case req: FindCoordinatorRequest =>
FindCoordinatorRequestDataJsonConverter.write(req.data, request.version)
- case req: GetTelemetrySubscriptionsRequest =>
GetTelemetrySubscriptionsRequestDataJsonConverter.write(req.data,
request.version)
- case req: HeartbeatRequest =>
HeartbeatRequestDataJsonConverter.write(req.data, request.version)
- case req: IncrementalAlterConfigsRequest =>
IncrementalAlterConfigsRequestDataJsonConverter.write(req.data, request.version)
- case req: InitializeShareGroupStateRequest =>
InitializeShareGroupStateRequestDataJsonConverter.write(req.data,
request.version)
- case req: InitProducerIdRequest =>
InitProducerIdRequestDataJsonConverter.write(req.data, request.version)
- case req: JoinGroupRequest =>
JoinGroupRequestDataJsonConverter.write(req.data, request.version)
- case req: LeaderAndIsrRequest =>
LeaderAndIsrRequestDataJsonConverter.write(req.data, request.version)
- case req: LeaveGroupRequest =>
LeaveGroupRequestDataJsonConverter.write(req.data, request.version)
- case req: ListClientMetricsResourcesRequest =>
ListClientMetricsResourcesRequestDataJsonConverter.write(req.data,
request.version)
- case req: ListGroupsRequest =>
ListGroupsRequestDataJsonConverter.write(req.data, request.version)
- case req: ListOffsetsRequest =>
ListOffsetsRequestDataJsonConverter.write(req.data, request.version)
- case req: ListPartitionReassignmentsRequest =>
ListPartitionReassignmentsRequestDataJsonConverter.write(req.data,
request.version)
- case req: ListTransactionsRequest =>
ListTransactionsRequestDataJsonConverter.write(req.data, request.version)
- case req: MetadataRequest =>
MetadataRequestDataJsonConverter.write(req.data, request.version)
- case req: OffsetCommitRequest =>
OffsetCommitRequestDataJsonConverter.write(req.data, request.version)
- case req: OffsetDeleteRequest =>
OffsetDeleteRequestDataJsonConverter.write(req.data, request.version)
- case req: OffsetFetchRequest =>
OffsetFetchRequestDataJsonConverter.write(req.data, request.version)
- case req: OffsetsForLeaderEpochRequest =>
OffsetForLeaderEpochRequestDataJsonConverter.write(req.data, request.version)
- case req: ProduceRequest =>
ProduceRequestDataJsonConverter.write(req.data, request.version, false)
- case req: PushTelemetryRequest =>
PushTelemetryRequestDataJsonConverter.write(req.data, request.version)
- case req: ReadShareGroupStateRequest =>
ReadShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
- case req: ReadShareGroupStateSummaryRequest =>
ReadShareGroupStateSummaryRequestDataJsonConverter.write(req.data,
request.version)
- case req: RenewDelegationTokenRequest =>
RenewDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
- case req: SaslAuthenticateRequest =>
SaslAuthenticateRequestDataJsonConverter.write(req.data, request.version)
- case req: SaslHandshakeRequest =>
SaslHandshakeRequestDataJsonConverter.write(req.data, request.version)
- case req: ShareAcknowledgeRequest =>
ShareAcknowledgeRequestDataJsonConverter.write(req.data, request.version)
- case req: ShareFetchRequest =>
ShareFetchRequestDataJsonConverter.write(req.data, request.version)
- case req: ShareGroupDescribeRequest =>
ShareGroupDescribeRequestDataJsonConverter.write(req.data, request.version)
- case req: ShareGroupHeartbeatRequest =>
ShareGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
- case req: StopReplicaRequest =>
StopReplicaRequestDataJsonConverter.write(req.data, request.version)
- case req: SyncGroupRequest =>
SyncGroupRequestDataJsonConverter.write(req.data, request.version)
- case req: TxnOffsetCommitRequest =>
TxnOffsetCommitRequestDataJsonConverter.write(req.data, request.version)
- case req: UnregisterBrokerRequest =>
UnregisterBrokerRequestDataJsonConverter.write(req.data, request.version)
- case req: UpdateFeaturesRequest =>
UpdateFeaturesRequestDataJsonConverter.write(req.data, request.version)
- case req: UpdateMetadataRequest =>
UpdateMetadataRequestDataJsonConverter.write(req.data, request.version)
- case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data,
request.version)
- case req: WriteShareGroupStateRequest =>
WriteShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
- case req: WriteTxnMarkersRequest =>
WriteTxnMarkersRequestDataJsonConverter.write(req.data, request.version)
- case req: AddRaftVoterRequest =>
AddRaftVoterRequestDataJsonConverter.write(req.data, request.version)
- case req: RemoveRaftVoterRequest =>
RemoveRaftVoterRequestDataJsonConverter.write(req.data, request.version)
- case req: UpdateRaftVoterRequest =>
UpdateRaftVoterRequestDataJsonConverter.write(req.data, request.version)
- case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is
not currently handled in `request`, the " +
- "code should be updated to do so.")
- }
- }
-
- def response(response: AbstractResponse, version: Short): JsonNode = {
- response match {
- case res: AddOffsetsToTxnResponse =>
AddOffsetsToTxnResponseDataJsonConverter.write(res.data, version)
- case res: AddPartitionsToTxnResponse =>
AddPartitionsToTxnResponseDataJsonConverter.write(res.data, version)
- case res: AllocateProducerIdsResponse =>
AllocateProducerIdsResponseDataJsonConverter.write(res.data, version)
- case res: AlterClientQuotasResponse =>
AlterClientQuotasResponseDataJsonConverter.write(res.data, version)
- case res: AlterConfigsResponse =>
AlterConfigsResponseDataJsonConverter.write(res.data, version)
- case res: AlterPartitionReassignmentsResponse =>
AlterPartitionReassignmentsResponseDataJsonConverter.write(res.data, version)
- case res: AlterPartitionResponse =>
AlterPartitionResponseDataJsonConverter.write(res.data, version)
- case res: AlterReplicaLogDirsResponse =>
AlterReplicaLogDirsResponseDataJsonConverter.write(res.data, version)
- case res: AlterUserScramCredentialsResponse =>
AlterUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
- case res: ApiVersionsResponse =>
ApiVersionsResponseDataJsonConverter.write(res.data, version)
- case res: AssignReplicasToDirsResponse =>
AssignReplicasToDirsResponseDataJsonConverter.write(res.data, version)
- case res: BeginQuorumEpochResponse =>
BeginQuorumEpochResponseDataJsonConverter.write(res.data, version)
- case res: BrokerHeartbeatResponse =>
BrokerHeartbeatResponseDataJsonConverter.write(res.data, version)
- case res: BrokerRegistrationResponse =>
BrokerRegistrationResponseDataJsonConverter.write(res.data, version)
- case res: ConsumerGroupDescribeResponse =>
ConsumerGroupDescribeResponseDataJsonConverter.write(res.data, version)
- case res: ConsumerGroupHeartbeatResponse =>
ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
- case res: ControlledShutdownResponse =>
ControlledShutdownResponseDataJsonConverter.write(res.data, version)
- case req: ControllerRegistrationResponse =>
ControllerRegistrationResponseDataJsonConverter.write(req.data, version)
- case res: CreateAclsResponse =>
CreateAclsResponseDataJsonConverter.write(res.data, version)
- case res: CreateDelegationTokenResponse =>
CreateDelegationTokenResponseDataJsonConverter.write(res.data, version)
- case res: CreatePartitionsResponse =>
CreatePartitionsResponseDataJsonConverter.write(res.data, version)
- case res: CreateTopicsResponse =>
CreateTopicsResponseDataJsonConverter.write(res.data, version)
- case res: DeleteAclsResponse =>
DeleteAclsResponseDataJsonConverter.write(res.data, version)
- case res: DeleteGroupsResponse =>
DeleteGroupsResponseDataJsonConverter.write(res.data, version)
- case res: DeleteRecordsResponse =>
DeleteRecordsResponseDataJsonConverter.write(res.data, version)
- case res: DeleteShareGroupStateResponse =>
DeleteShareGroupStateResponseDataJsonConverter.write(res.data, version)
- case res: DeleteTopicsResponse =>
DeleteTopicsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeAclsResponse =>
DescribeAclsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeClientQuotasResponse =>
DescribeClientQuotasResponseDataJsonConverter.write(res.data, version)
- case res: DescribeClusterResponse =>
DescribeClusterResponseDataJsonConverter.write(res.data, version)
- case res: DescribeConfigsResponse =>
DescribeConfigsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeDelegationTokenResponse =>
DescribeDelegationTokenResponseDataJsonConverter.write(res.data, version)
- case res: DescribeGroupsResponse =>
DescribeGroupsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeLogDirsResponse =>
DescribeLogDirsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeProducersResponse =>
DescribeProducersResponseDataJsonConverter.write(res.data, version)
- case res: DescribeQuorumResponse =>
DescribeQuorumResponseDataJsonConverter.write(res.data, version)
- case res: DescribeTopicPartitionsResponse =>
DescribeTopicPartitionsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeTransactionsResponse =>
DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
- case res: DescribeUserScramCredentialsResponse =>
DescribeUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
- case res: ElectLeadersResponse =>
ElectLeadersResponseDataJsonConverter.write(res.data, version)
- case res: EndQuorumEpochResponse =>
EndQuorumEpochResponseDataJsonConverter.write(res.data, version)
- case res: EndTxnResponse =>
EndTxnResponseDataJsonConverter.write(res.data, version)
- case res: EnvelopeResponse =>
EnvelopeResponseDataJsonConverter.write(res.data, version)
- case res: ExpireDelegationTokenResponse =>
ExpireDelegationTokenResponseDataJsonConverter.write(res.data, version)
- case res: FetchResponse =>
FetchResponseDataJsonConverter.write(res.data, version, false)
- case res: FetchSnapshotResponse =>
FetchSnapshotResponseDataJsonConverter.write(res.data, version)
- case res: FindCoordinatorResponse =>
FindCoordinatorResponseDataJsonConverter.write(res.data, version)
- case res: GetTelemetrySubscriptionsResponse =>
GetTelemetrySubscriptionsResponseDataJsonConverter.write(res.data, version)
- case res: HeartbeatResponse =>
HeartbeatResponseDataJsonConverter.write(res.data, version)
- case res: IncrementalAlterConfigsResponse =>
IncrementalAlterConfigsResponseDataJsonConverter.write(res.data, version)
- case res: InitializeShareGroupStateResponse =>
InitializeShareGroupStateResponseDataJsonConverter.write(res.data, version)
- case res: InitProducerIdResponse =>
InitProducerIdResponseDataJsonConverter.write(res.data, version)
- case res: JoinGroupResponse =>
JoinGroupResponseDataJsonConverter.write(res.data, version)
- case res: LeaderAndIsrResponse =>
LeaderAndIsrResponseDataJsonConverter.write(res.data, version)
- case res: LeaveGroupResponse =>
LeaveGroupResponseDataJsonConverter.write(res.data, version)
- case res: ListClientMetricsResourcesResponse =>
ListClientMetricsResourcesResponseDataJsonConverter.write(res.data, version)
- case res: ListGroupsResponse =>
ListGroupsResponseDataJsonConverter.write(res.data, version)
- case res: ListOffsetsResponse =>
ListOffsetsResponseDataJsonConverter.write(res.data, version)
- case res: ListPartitionReassignmentsResponse =>
ListPartitionReassignmentsResponseDataJsonConverter.write(res.data, version)
- case res: ListTransactionsResponse =>
ListTransactionsResponseDataJsonConverter.write(res.data, version)
- case res: MetadataResponse =>
MetadataResponseDataJsonConverter.write(res.data, version)
- case res: OffsetCommitResponse =>
OffsetCommitResponseDataJsonConverter.write(res.data, version)
- case res: OffsetDeleteResponse =>
OffsetDeleteResponseDataJsonConverter.write(res.data, version)
- case res: OffsetFetchResponse =>
OffsetFetchResponseDataJsonConverter.write(res.data, version)
- case res: OffsetsForLeaderEpochResponse =>
OffsetForLeaderEpochResponseDataJsonConverter.write(res.data, version)
- case res: ProduceResponse =>
ProduceResponseDataJsonConverter.write(res.data, version)
- case res: PushTelemetryResponse =>
PushTelemetryResponseDataJsonConverter.write(res.data, version)
- case res: ReadShareGroupStateResponse =>
ReadShareGroupStateResponseDataJsonConverter.write(res.data, version)
- case res: ReadShareGroupStateSummaryResponse =>
ReadShareGroupStateSummaryResponseDataJsonConverter.write(res.data, version)
- case res: RenewDelegationTokenResponse =>
RenewDelegationTokenResponseDataJsonConverter.write(res.data, version)
- case res: SaslAuthenticateResponse =>
SaslAuthenticateResponseDataJsonConverter.write(res.data, version)
- case res: SaslHandshakeResponse =>
SaslHandshakeResponseDataJsonConverter.write(res.data, version)
- case res: ShareAcknowledgeResponse =>
ShareAcknowledgeResponseDataJsonConverter.write(res.data, version)
- case res: ShareFetchResponse =>
ShareFetchResponseDataJsonConverter.write(res.data, version)
- case res: ShareGroupDescribeResponse =>
ShareGroupDescribeResponseDataJsonConverter.write(res.data, version)
- case res: ShareGroupHeartbeatResponse =>
ShareGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
- case res: StopReplicaResponse =>
StopReplicaResponseDataJsonConverter.write(res.data, version)
- case res: SyncGroupResponse =>
SyncGroupResponseDataJsonConverter.write(res.data, version)
- case res: TxnOffsetCommitResponse =>
TxnOffsetCommitResponseDataJsonConverter.write(res.data, version)
- case res: UnregisterBrokerResponse =>
UnregisterBrokerResponseDataJsonConverter.write(res.data, version)
- case res: UpdateFeaturesResponse =>
UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
- case res: UpdateMetadataResponse =>
UpdateMetadataResponseDataJsonConverter.write(res.data, version)
- case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data,
version)
- case res: WriteShareGroupStateResponse =>
WriteShareGroupStateResponseDataJsonConverter.write(res.data, version)
- case res: WriteTxnMarkersResponse =>
WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
- case res: AddRaftVoterResponse =>
AddRaftVoterResponseDataJsonConverter.write(res.data, version)
- case res: RemoveRaftVoterResponse =>
RemoveRaftVoterResponseDataJsonConverter.write(res.data, version)
- case res: UpdateRaftVoterResponse =>
UpdateRaftVoterResponseDataJsonConverter.write(res.data, version)
- case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is
not currently handled in `response`, the " +
- "code should be updated to do so.")
- }
- }
-
- def requestHeaderNode(header: RequestHeader): JsonNode = {
- val node = RequestHeaderDataJsonConverter.write(header.data,
header.headerVersion, false).asInstanceOf[ObjectNode]
- node.set("requestApiKeyName", new TextNode(header.apiKey.toString))
- if (header.apiKey().isVersionDeprecated(header.apiVersion()))
- node.set("requestApiVersionDeprecated", BooleanNode.TRUE)
- node
- }
-
- def requestDesc(header: RequestHeader, requestNode: Option[JsonNode],
isForwarded: Boolean): JsonNode = {
- val node = new ObjectNode(JsonNodeFactory.instance)
- node.set("isForwarded", if (isForwarded) BooleanNode.TRUE else
BooleanNode.FALSE)
- node.set("requestHeader", requestHeaderNode(header))
- node.set("request", requestNode.getOrElse(new TextNode("")))
- node
- }
-
- def clientInfoNode(clientInfo: ClientInformation): JsonNode = {
- val node = new ObjectNode(JsonNodeFactory.instance)
- node.set("softwareName", new TextNode(clientInfo.softwareName))
- node.set("softwareVersion", new TextNode(clientInfo.softwareVersion))
- node
- }
-
- def requestDescMetrics(header: RequestHeader, requestNode: Option[JsonNode],
responseNode: Option[JsonNode],
- context: RequestContext, session: Session,
isForwarded: Boolean,
- totalTimeMs: Double, requestQueueTimeMs: Double,
apiLocalTimeMs: Double,
- apiRemoteTimeMs: Double, apiThrottleTimeMs: Long,
responseQueueTimeMs: Double,
- responseSendTimeMs: Double, temporaryMemoryBytes:
Long,
- messageConversionsTimeMs: Double): JsonNode = {
- val node = requestDesc(header, requestNode,
isForwarded).asInstanceOf[ObjectNode]
- node.set("response", responseNode.getOrElse(new TextNode("")))
- node.set("connection", new TextNode(context.connectionId))
- node.set("totalTimeMs", new DoubleNode(totalTimeMs))
- node.set("requestQueueTimeMs", new DoubleNode(requestQueueTimeMs))
- node.set("localTimeMs", new DoubleNode(apiLocalTimeMs))
- node.set("remoteTimeMs", new DoubleNode(apiRemoteTimeMs))
- node.set("throttleTimeMs", new LongNode(apiThrottleTimeMs))
- node.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs))
- node.set("sendTimeMs", new DoubleNode(responseSendTimeMs))
- node.set("securityProtocol", new
TextNode(context.securityProtocol.toString))
- node.set("principal", new TextNode(session.principal.toString))
- node.set("listener", new TextNode(context.listenerName.value))
- node.set("clientInformation", clientInfoNode(context.clientInformation))
- if (temporaryMemoryBytes > 0)
- node.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes))
- if (messageConversionsTimeMs > 0)
- node.set("messageConversionsTime", new
DoubleNode(messageConversionsTimeMs))
- node
- }
-}
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 3aaa1458f33..d01d390813b 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.AlterConfigsRequest._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
+import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.test
import org.junit.jupiter.api.Assertions._
@@ -49,6 +50,7 @@ import java.nio.ByteBuffer
import java.util.Collections
import java.util.concurrent.atomic.AtomicReference
import scala.collection.{Map, Seq}
+import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
class RequestChannelTest {
@@ -69,7 +71,7 @@ class RequestChannelTest {
val loggableAlterConfigs =
alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.configs.get(resource)
assertEquals(expectedValues, toMap(loggedConfig))
- val alterConfigsDesc =
RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog,
alterConfigs.isForwarded).toString
+ val alterConfigsDesc =
RequestConvertToJson.requestDesc(alterConfigs.header,
alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive
config logged $alterConfigsDesc")
}
@@ -133,7 +135,7 @@ class RequestChannelTest {
val loggableAlterConfigs =
alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
val loggedConfig =
loggableAlterConfigs.data.resources.find(resource.`type`.id,
resource.name).configs
assertEquals(expectedValues, toMap(loggedConfig))
- val alterConfigsDesc =
RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog,
alterConfigs.isForwarded).toString
+ val alterConfigsDesc =
RequestConvertToJson.requestDesc(alterConfigs.header,
alterConfigs.requestLog.asJava, alterConfigs.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive
config logged $alterConfigsDesc")
}
diff --git
a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
index ac9870cc06a..bdc7da74ddd 100644
--- a/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
@@ -21,91 +21,22 @@ import java.net.InetAddress
import java.nio.ByteBuffer
import com.fasterxml.jackson.databind.node.{BooleanNode, DoubleNode,
JsonNodeFactory, LongNode, ObjectNode, TextNode}
import kafka.network
-import kafka.network.RequestConvertToJson.requestHeaderNode
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message._
import org.apache.kafka.common.network.{ClientInformation, ListenerName,
NetworkSend}
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import java.util.Collections
-import scala.collection.mutable.ArrayBuffer
+import scala.compat.java8.OptionConverters._
class RequestConvertToJsonTest {
- @Test
- def testAllRequestTypesHandled(): Unit = {
- val unhandledKeys = ArrayBuffer[String]()
- ApiKeys.values().foreach { key => {
- val version: Short = key.latestVersion()
- val message = key match {
- case ApiKeys.DESCRIBE_ACLS =>
-
ApiMessageType.fromApiKey(key.id).newRequest().asInstanceOf[DescribeAclsRequestData]
-
.setPatternTypeFilter(1).setResourceTypeFilter(1).setPermissionType(1).setOperation(1)
- case _ =>
- ApiMessageType.fromApiKey(key.id).newRequest()
- }
-
- val bytes = MessageUtil.toByteBuffer(message, version)
- val req = AbstractRequest.parseRequest(key, version, bytes).request
- try {
- RequestConvertToJson.request(req)
- } catch {
- case _ : IllegalStateException => unhandledKeys += key.toString
- }
- }}
- assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled request keys")
- }
-
- @Test
- def testAllApiVersionsResponseHandled(): Unit = {
-
- ApiKeys.values().foreach { key => {
- val unhandledVersions = ArrayBuffer[java.lang.Short]()
- key.allVersions().forEach { version => {
- val message = key match {
- // Specify top-level error handling for verifying compatibility
across versions
- case ApiKeys.DESCRIBE_LOG_DIRS =>
-
ApiMessageType.fromApiKey(key.id).newResponse().asInstanceOf[DescribeLogDirsResponseData]
- .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code())
- case _ =>
- ApiMessageType.fromApiKey(key.id).newResponse()
- }
-
- val bytes = MessageUtil.toByteBuffer(message, version)
- val response = AbstractResponse.parseResponse(key, bytes, version)
- try {
- RequestConvertToJson.response(response, version)
- } catch {
- case _ : IllegalStateException => unhandledVersions += version
- }}
- }
- assertEquals(ArrayBuffer.empty, unhandledVersions, s"API:
${key.toString} - Unhandled request versions")
- }}
- }
-
- @Test
- def testAllResponseTypesHandled(): Unit = {
- val unhandledKeys = ArrayBuffer[String]()
- ApiKeys.values().foreach { key => {
- val version: Short = key.latestVersion()
- val message = ApiMessageType.fromApiKey(key.id).newResponse()
- val bytes = MessageUtil.toByteBuffer(message, version)
- val res = AbstractResponse.parseResponse(key, bytes, version)
- try {
- RequestConvertToJson.response(res, version)
- } catch {
- case _ : IllegalStateException => unhandledKeys += key.toString
- }
- }}
- assertEquals(ArrayBuffer.empty, unhandledKeys, "Unhandled response keys")
- }
-
@Test
def testRequestHeaderNode(): Unit = {
val alterIsrRequest = new AlterPartitionRequest(new
AlterPartitionRequestData(), 0)
@@ -135,19 +66,6 @@ class RequestConvertToJsonTest {
assertEquals(expectedNode, actualNode)
}
- @Test
- def testClientInfoNode(): Unit = {
- val clientInfo = new ClientInformation("name", "1")
-
- val expectedNode = new ObjectNode(JsonNodeFactory.instance)
- expectedNode.set("softwareName", new TextNode(clientInfo.softwareName))
- expectedNode.set("softwareVersion", new
TextNode(clientInfo.softwareVersion))
-
- val actualNode = RequestConvertToJson.clientInfoNode(clientInfo)
-
- assertEquals(expectedNode, actualNode)
- }
-
@Test
def testRequestDesc(): Unit = {
val alterIsrRequest = new AlterPartitionRequest(new
AlterPartitionRequestData(), 0)
@@ -155,10 +73,10 @@ class RequestConvertToJsonTest {
val expectedNode = new ObjectNode(JsonNodeFactory.instance)
expectedNode.set("isForwarded", if (req.isForwarded) BooleanNode.TRUE else
BooleanNode.FALSE)
- expectedNode.set("requestHeader", requestHeaderNode(req.header))
+ expectedNode.set("requestHeader",
RequestConvertToJson.requestHeaderNode(req.header))
expectedNode.set("request", req.requestLog.getOrElse(new TextNode("")))
- val actualNode = RequestConvertToJson.requestDesc(req.header,
req.requestLog, req.isForwarded)
+ val actualNode = RequestConvertToJson.requestDesc(req.header,
req.requestLog.asJava, req.isForwarded)
assertEquals(expectedNode, actualNode)
}
@@ -181,7 +99,7 @@ class RequestConvertToJsonTest {
val temporaryMemoryBytes = 8
val messageConversionsTimeMs = 9
- val expectedNode = RequestConvertToJson.requestDesc(req.header,
req.requestLog, req.isForwarded).asInstanceOf[ObjectNode]
+ val expectedNode = RequestConvertToJson.requestDesc(req.header,
req.requestLog.asJava, req.isForwarded).asInstanceOf[ObjectNode]
expectedNode.set("response", res.responseLog.getOrElse(new TextNode("")))
expectedNode.set("connection", new TextNode(req.context.connectionId))
expectedNode.set("totalTimeMs", new DoubleNode(totalTimeMs))
@@ -198,7 +116,7 @@ class RequestConvertToJsonTest {
expectedNode.set("temporaryMemoryBytes", new
LongNode(temporaryMemoryBytes))
expectedNode.set("messageConversionsTime", new
DoubleNode(messageConversionsTimeMs))
- val actualNode = RequestConvertToJson.requestDescMetrics(req.header,
req.requestLog, res.responseLog, req.context, req.session, req.isForwarded,
+ val actualNode = RequestConvertToJson.requestDescMetrics(req.header,
req.requestLog.asJava, res.responseLog.asJava, req.context, req.session,
req.isForwarded,
totalTimeMs, requestQueueTimeMs, apiLocalTimeMs, apiRemoteTimeMs,
apiThrottleTimeMs, responseQueueTimeMs,
responseSendTimeMs, temporaryMemoryBytes,
messageConversionsTimeMs).asInstanceOf[ObjectNode]
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b7a1c7c32f0..2a34d2aea5f 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -35,6 +35,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils._
+import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestMetrics
import org.apache.kafka.security.CredentialProvider
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
index eecb619c0e8..f0fa2c32204 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
@@ -17,8 +17,6 @@
package org.apache.kafka.jmh.common;
-import kafka.network.RequestConvertToJson;
-
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.network.Send;
@@ -27,6 +25,7 @@ import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ByteBufferChannel;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.network.RequestConvertToJson;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
index f5e8b3e4591..943c5baa145 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ListOffsetRequestBenchmark.java
@@ -17,13 +17,12 @@
package org.apache.kafka.jmh.common;
-import kafka.network.RequestConvertToJson;
-
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.network.RequestConvertToJson;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
index 2fcafc5ada1..55ccee8516e 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/ProduceRequestBenchmark.java
@@ -17,11 +17,10 @@
package org.apache.kafka.jmh.common;
-import kafka.network.RequestConvertToJson;
-
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.network.RequestConvertToJson;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index 7b2bd65a621..a56ff3400a7 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -19,7 +19,6 @@ package org.apache.kafka.jmh.metadata;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
-import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
import kafka.server.ClientQuotaManager;
import kafka.server.ClientRequestQuotaManager;
@@ -58,6 +57,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.FinalizedFeatures;
@@ -237,7 +237,9 @@ public class KRaftMetadataRequestBenchmark {
@Benchmark
public String testRequestToJson() {
- return
RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(),
allTopicMetadataRequest.requestLog(),
allTopicMetadataRequest.isForwarded()).toString();
+ Option<com.fasterxml.jackson.databind.JsonNode> option =
allTopicMetadataRequest.requestLog();
+ Optional<com.fasterxml.jackson.databind.JsonNode> optional =
option.isDefined() ? Optional.of(option.get()) : Optional.empty();
+ return
RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional,
allTopicMetadataRequest.isForwarded()).toString();
}
@Benchmark
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index e7acdb5de41..eeebbfaa1a5 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -20,7 +20,6 @@ package org.apache.kafka.jmh.metadata;
import kafka.controller.KafkaController;
import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
-import kafka.network.RequestConvertToJson;
import kafka.server.AutoTopicCreationManager;
import kafka.server.BrokerFeatures;
import kafka.server.ClientQuotaManager;
@@ -59,6 +58,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
+import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.server.common.FinalizedFeatures;
import org.apache.kafka.server.common.MetadataVersion;
@@ -237,7 +237,9 @@ public class MetadataRequestBenchmark {
@Benchmark
public String testRequestToJson() {
- return
RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(),
allTopicMetadataRequest.requestLog(),
allTopicMetadataRequest.isForwarded()).toString();
+ Option<com.fasterxml.jackson.databind.JsonNode> option =
allTopicMetadataRequest.requestLog();
+ Optional<com.fasterxml.jackson.databind.JsonNode> optional =
option.isDefined() ? Optional.of(option.get()) : Optional.empty();
+ return
RequestConvertToJson.requestDesc(allTopicMetadataRequest.header(), optional,
allTopicMetadataRequest.isForwarded()).toString();
}
@Benchmark
diff --git
a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
new file mode 100644
index 00000000000..ac744ef7bac
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.message.AddOffsetsToTxnRequestDataJsonConverter;
+import
org.apache.kafka.common.message.AddOffsetsToTxnResponseDataJsonConverter;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestDataJsonConverter;
+import
org.apache.kafka.common.message.AddPartitionsToTxnResponseDataJsonConverter;
+import org.apache.kafka.common.message.AddRaftVoterRequestDataJsonConverter;
+import org.apache.kafka.common.message.AddRaftVoterResponseDataJsonConverter;
+import
org.apache.kafka.common.message.AllocateProducerIdsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.AllocateProducerIdsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.AlterClientQuotasRequestDataJsonConverter;
+import
org.apache.kafka.common.message.AlterClientQuotasResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterConfigsRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterConfigsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseDataJsonConverter;
+import org.apache.kafka.common.message.AlterPartitionRequestDataJsonConverter;
+import org.apache.kafka.common.message.AlterPartitionResponseDataJsonConverter;
+import
org.apache.kafka.common.message.AlterReplicaLogDirsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.AlterReplicaLogDirsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.AlterUserScramCredentialsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ApiVersionsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ApiVersionsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.AssignReplicasToDirsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.AssignReplicasToDirsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.BeginQuorumEpochRequestDataJsonConverter;
+import
org.apache.kafka.common.message.BeginQuorumEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestDataJsonConverter;
+import
org.apache.kafka.common.message.BrokerHeartbeatResponseDataJsonConverter;
+import
org.apache.kafka.common.message.BrokerRegistrationRequestDataJsonConverter;
+import
org.apache.kafka.common.message.BrokerRegistrationResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ConsumerGroupDescribeRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ControlledShutdownRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ControlledShutdownResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ControllerRegistrationRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ControllerRegistrationResponseDataJsonConverter;
+import org.apache.kafka.common.message.CreateAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.CreateAclsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.CreateDelegationTokenRequestDataJsonConverter;
+import
org.apache.kafka.common.message.CreateDelegationTokenResponseDataJsonConverter;
+import
org.apache.kafka.common.message.CreatePartitionsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.CreatePartitionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.CreateTopicsRequestDataJsonConverter;
+import org.apache.kafka.common.message.CreateTopicsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteAclsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteRecordsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteRecordsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.DeleteShareGroupStateRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DeleteShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.DeleteTopicsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DeleteTopicsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeAclsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeAclsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeClientQuotasRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeClientQuotasResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeClusterRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeClusterResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeConfigsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeConfigsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeDelegationTokenRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeLogDirsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeLogDirsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeProducersRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeProducersResponseDataJsonConverter;
+import org.apache.kafka.common.message.DescribeQuorumRequestDataJsonConverter;
+import org.apache.kafka.common.message.DescribeQuorumResponseDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeTopicPartitionsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeTopicPartitionsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeTransactionsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeTransactionsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeUserScramCredentialsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ElectLeadersRequestDataJsonConverter;
+import org.apache.kafka.common.message.ElectLeadersResponseDataJsonConverter;
+import org.apache.kafka.common.message.EndQuorumEpochRequestDataJsonConverter;
+import org.apache.kafka.common.message.EndQuorumEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.EndTxnRequestDataJsonConverter;
+import org.apache.kafka.common.message.EndTxnResponseDataJsonConverter;
+import org.apache.kafka.common.message.EnvelopeRequestDataJsonConverter;
+import org.apache.kafka.common.message.EnvelopeResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ExpireDelegationTokenRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ExpireDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.FetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.FetchResponseDataJsonConverter;
+import org.apache.kafka.common.message.FetchSnapshotRequestDataJsonConverter;
+import org.apache.kafka.common.message.FetchSnapshotResponseDataJsonConverter;
+import org.apache.kafka.common.message.FindCoordinatorRequestDataJsonConverter;
+import
org.apache.kafka.common.message.FindCoordinatorResponseDataJsonConverter;
+import
org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.HeartbeatRequestDataJsonConverter;
+import org.apache.kafka.common.message.HeartbeatResponseDataJsonConverter;
+import
org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseDataJsonConverter;
+import org.apache.kafka.common.message.InitProducerIdRequestDataJsonConverter;
+import org.apache.kafka.common.message.InitProducerIdResponseDataJsonConverter;
+import
org.apache.kafka.common.message.InitializeShareGroupStateRequestDataJsonConverter;
+import
org.apache.kafka.common.message.InitializeShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.JoinGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.JoinGroupResponseDataJsonConverter;
+import org.apache.kafka.common.message.LeaderAndIsrRequestDataJsonConverter;
+import org.apache.kafka.common.message.LeaderAndIsrResponseDataJsonConverter;
+import org.apache.kafka.common.message.LeaveGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.LeaveGroupResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ListClientMetricsResourcesRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ListClientMetricsResourcesResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListGroupsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListGroupsResponseDataJsonConverter;
+import org.apache.kafka.common.message.ListOffsetsRequestDataJsonConverter;
+import org.apache.kafka.common.message.ListOffsetsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ListPartitionReassignmentsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ListPartitionReassignmentsResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ListTransactionsRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ListTransactionsResponseDataJsonConverter;
+import org.apache.kafka.common.message.MetadataRequestDataJsonConverter;
+import org.apache.kafka.common.message.MetadataResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetCommitRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetCommitResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetDeleteRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetDeleteResponseDataJsonConverter;
+import org.apache.kafka.common.message.OffsetFetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.OffsetFetchResponseDataJsonConverter;
+import
org.apache.kafka.common.message.OffsetForLeaderEpochRequestDataJsonConverter;
+import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseDataJsonConverter;
+import org.apache.kafka.common.message.ProduceRequestDataJsonConverter;
+import org.apache.kafka.common.message.ProduceResponseDataJsonConverter;
+import org.apache.kafka.common.message.PushTelemetryRequestDataJsonConverter;
+import org.apache.kafka.common.message.PushTelemetryResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ReadShareGroupStateRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ReadShareGroupStateResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseDataJsonConverter;
+import org.apache.kafka.common.message.RemoveRaftVoterRequestDataJsonConverter;
+import
org.apache.kafka.common.message.RemoveRaftVoterResponseDataJsonConverter;
+import
org.apache.kafka.common.message.RenewDelegationTokenRequestDataJsonConverter;
+import
org.apache.kafka.common.message.RenewDelegationTokenResponseDataJsonConverter;
+import org.apache.kafka.common.message.RequestHeaderDataJsonConverter;
+import
org.apache.kafka.common.message.SaslAuthenticateRequestDataJsonConverter;
+import
org.apache.kafka.common.message.SaslAuthenticateResponseDataJsonConverter;
+import org.apache.kafka.common.message.SaslHandshakeRequestDataJsonConverter;
+import org.apache.kafka.common.message.SaslHandshakeResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ShareAcknowledgeRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ShareAcknowledgeResponseDataJsonConverter;
+import org.apache.kafka.common.message.ShareFetchRequestDataJsonConverter;
+import org.apache.kafka.common.message.ShareFetchResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ShareGroupDescribeRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ShareGroupDescribeResponseDataJsonConverter;
+import
org.apache.kafka.common.message.ShareGroupHeartbeatRequestDataJsonConverter;
+import
org.apache.kafka.common.message.ShareGroupHeartbeatResponseDataJsonConverter;
+import org.apache.kafka.common.message.StopReplicaRequestDataJsonConverter;
+import org.apache.kafka.common.message.StopReplicaResponseDataJsonConverter;
+import org.apache.kafka.common.message.SyncGroupRequestDataJsonConverter;
+import org.apache.kafka.common.message.SyncGroupResponseDataJsonConverter;
+import org.apache.kafka.common.message.TxnOffsetCommitRequestDataJsonConverter;
+import
org.apache.kafka.common.message.TxnOffsetCommitResponseDataJsonConverter;
+import
org.apache.kafka.common.message.UnregisterBrokerRequestDataJsonConverter;
+import
org.apache.kafka.common.message.UnregisterBrokerResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateFeaturesRequestDataJsonConverter;
+import org.apache.kafka.common.message.UpdateFeaturesResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateMetadataRequestDataJsonConverter;
+import org.apache.kafka.common.message.UpdateMetadataResponseDataJsonConverter;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestDataJsonConverter;
+import
org.apache.kafka.common.message.UpdateRaftVoterResponseDataJsonConverter;
+import org.apache.kafka.common.message.VoteRequestDataJsonConverter;
+import org.apache.kafka.common.message.VoteResponseDataJsonConverter;
+import
org.apache.kafka.common.message.WriteShareGroupStateRequestDataJsonConverter;
+import
org.apache.kafka.common.message.WriteShareGroupStateResponseDataJsonConverter;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestDataJsonConverter;
+import
org.apache.kafka.common.message.WriteTxnMarkersResponseDataJsonConverter;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.AddRaftVoterRequest;
+import org.apache.kafka.common.requests.AddRaftVoterResponse;
+import org.apache.kafka.common.requests.AllocateProducerIdsRequest;
+import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
+import org.apache.kafka.common.requests.AlterClientQuotasRequest;
+import org.apache.kafka.common.requests.AlterClientQuotasResponse;
+import org.apache.kafka.common.requests.AlterConfigsRequest;
+import org.apache.kafka.common.requests.AlterConfigsResponse;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
+import org.apache.kafka.common.requests.AlterPartitionRequest;
+import org.apache.kafka.common.requests.AlterPartitionResponse;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
+import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
+import org.apache.kafka.common.requests.AlterUserScramCredentialsRequest;
+import org.apache.kafka.common.requests.AlterUserScramCredentialsResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.AssignReplicasToDirsRequest;
+import org.apache.kafka.common.requests.AssignReplicasToDirsResponse;
+import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
+import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
+import org.apache.kafka.common.requests.BrokerHeartbeatRequest;
+import org.apache.kafka.common.requests.BrokerHeartbeatResponse;
+import org.apache.kafka.common.requests.BrokerRegistrationRequest;
+import org.apache.kafka.common.requests.BrokerRegistrationResponse;
+import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
+import org.apache.kafka.common.requests.ConsumerGroupDescribeResponse;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.ControlledShutdownRequest;
+import org.apache.kafka.common.requests.ControlledShutdownResponse;
+import org.apache.kafka.common.requests.ControllerRegistrationRequest;
+import org.apache.kafka.common.requests.ControllerRegistrationResponse;
+import org.apache.kafka.common.requests.CreateAclsRequest;
+import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
+import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
+import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteAclsRequest;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.requests.DeleteShareGroupStateRequest;
+import org.apache.kafka.common.requests.DeleteShareGroupStateResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.DescribeAclsRequest;
+import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeClientQuotasRequest;
+import org.apache.kafka.common.requests.DescribeClientQuotasResponse;
+import org.apache.kafka.common.requests.DescribeClusterRequest;
+import org.apache.kafka.common.requests.DescribeClusterResponse;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
+import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.DescribeLogDirsRequest;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.DescribeProducersRequest;
+import org.apache.kafka.common.requests.DescribeProducersResponse;
+import org.apache.kafka.common.requests.DescribeQuorumRequest;
+import org.apache.kafka.common.requests.DescribeQuorumResponse;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.DescribeUserScramCredentialsRequest;
+import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
+import org.apache.kafka.common.requests.ElectLeadersRequest;
+import org.apache.kafka.common.requests.ElectLeadersResponse;
+import org.apache.kafka.common.requests.EndQuorumEpochRequest;
+import org.apache.kafka.common.requests.EndQuorumEpochResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
+import org.apache.kafka.common.requests.ExpireDelegationTokenResponse;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.FetchSnapshotRequest;
+import org.apache.kafka.common.requests.FetchSnapshotResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
+import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsRequest;
+import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.InitializeShareGroupStateRequest;
+import org.apache.kafka.common.requests.InitializeShareGroupStateResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaderAndIsrRequest;
+import org.apache.kafka.common.requests.LeaderAndIsrResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.ListClientMetricsResourcesRequest;
+import org.apache.kafka.common.requests.ListClientMetricsResourcesResponse;
+import org.apache.kafka.common.requests.ListGroupsRequest;
+import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
+import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
+import org.apache.kafka.common.requests.ListTransactionsRequest;
+import org.apache.kafka.common.requests.ListTransactionsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetDeleteRequest;
+import org.apache.kafka.common.requests.OffsetDeleteResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.common.requests.PushTelemetryResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest;
+import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
+import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
+import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
+import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
+import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.SaslAuthenticateRequest;
+import org.apache.kafka.common.requests.SaslAuthenticateResponse;
+import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.requests.ShareAcknowledgeRequest;
+import org.apache.kafka.common.requests.ShareAcknowledgeResponse;
+import org.apache.kafka.common.requests.ShareFetchRequest;
+import org.apache.kafka.common.requests.ShareFetchResponse;
+import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
+import org.apache.kafka.common.requests.ShareGroupDescribeResponse;
+import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.StopReplicaRequest;
+import org.apache.kafka.common.requests.StopReplicaResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+import org.apache.kafka.common.requests.UnregisterBrokerRequest;
+import org.apache.kafka.common.requests.UnregisterBrokerResponse;
+import org.apache.kafka.common.requests.UpdateFeaturesRequest;
+import org.apache.kafka.common.requests.UpdateFeaturesResponse;
+import org.apache.kafka.common.requests.UpdateMetadataRequest;
+import org.apache.kafka.common.requests.UpdateMetadataResponse;
+import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
+import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
+import org.apache.kafka.common.requests.VoteRequest;
+import org.apache.kafka.common.requests.VoteResponse;
+import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
+import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.LongNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import java.util.Optional;
+
+public class RequestConvertToJson {
+
+ public static JsonNode request(AbstractRequest request) {
+ switch (request.apiKey()) {
+ case ADD_OFFSETS_TO_TXN:
+ return
AddOffsetsToTxnRequestDataJsonConverter.write(((AddOffsetsToTxnRequest)
request).data(), request.version());
+ case ADD_PARTITIONS_TO_TXN:
+ return
AddPartitionsToTxnRequestDataJsonConverter.write(((AddPartitionsToTxnRequest)
request).data(), request.version());
+ case ALLOCATE_PRODUCER_IDS:
+ return
AllocateProducerIdsRequestDataJsonConverter.write(((AllocateProducerIdsRequest)
request).data(), request.version());
+ case ALTER_CLIENT_QUOTAS:
+ return
AlterClientQuotasRequestDataJsonConverter.write(((AlterClientQuotasRequest)
request).data(), request.version());
+ case ALTER_CONFIGS:
+ return
AlterConfigsRequestDataJsonConverter.write(((AlterConfigsRequest)
request).data(), request.version());
+ case ALTER_PARTITION_REASSIGNMENTS:
+ return
AlterPartitionReassignmentsRequestDataJsonConverter.write(((AlterPartitionReassignmentsRequest)
request).data(), request.version());
+ case ALTER_PARTITION:
+ return
AlterPartitionRequestDataJsonConverter.write(((AlterPartitionRequest)
request).data(), request.version());
+ case ALTER_REPLICA_LOG_DIRS:
+ return
AlterReplicaLogDirsRequestDataJsonConverter.write(((AlterReplicaLogDirsRequest)
request).data(), request.version());
+ case ALTER_USER_SCRAM_CREDENTIALS:
+ return
AlterUserScramCredentialsRequestDataJsonConverter.write(((AlterUserScramCredentialsRequest)
request).data(), request.version());
+ case API_VERSIONS:
+ return
ApiVersionsRequestDataJsonConverter.write(((ApiVersionsRequest)
request).data(), request.version());
+ case ASSIGN_REPLICAS_TO_DIRS:
+ return
AssignReplicasToDirsRequestDataJsonConverter.write(((AssignReplicasToDirsRequest)
request).data(), request.version());
+ case BEGIN_QUORUM_EPOCH:
+ return
BeginQuorumEpochRequestDataJsonConverter.write(((BeginQuorumEpochRequest)
request).data(), request.version());
+ case BROKER_HEARTBEAT:
+ return
BrokerHeartbeatRequestDataJsonConverter.write(((BrokerHeartbeatRequest)
request).data(), request.version());
+ case BROKER_REGISTRATION:
+ return
BrokerRegistrationRequestDataJsonConverter.write(((BrokerRegistrationRequest)
request).data(), request.version());
+ case CONSUMER_GROUP_DESCRIBE:
+ return
ConsumerGroupDescribeRequestDataJsonConverter.write(((ConsumerGroupDescribeRequest)
request).data(), request.version());
+ case CONSUMER_GROUP_HEARTBEAT:
+ return
ConsumerGroupHeartbeatRequestDataJsonConverter.write(((ConsumerGroupHeartbeatRequest)
request).data(), request.version());
+ case CONTROLLED_SHUTDOWN:
+ return
ControlledShutdownRequestDataJsonConverter.write(((ControlledShutdownRequest)
request).data(), request.version());
+ case CONTROLLER_REGISTRATION:
+ return
ControllerRegistrationRequestDataJsonConverter.write(((ControllerRegistrationRequest)
request).data(), request.version());
+ case CREATE_ACLS:
+ return
CreateAclsRequestDataJsonConverter.write(((CreateAclsRequest) request).data(),
request.version());
+ case CREATE_DELEGATION_TOKEN:
+ return
CreateDelegationTokenRequestDataJsonConverter.write(((CreateDelegationTokenRequest)
request).data(), request.version());
+ case CREATE_PARTITIONS:
+ return
CreatePartitionsRequestDataJsonConverter.write(((CreatePartitionsRequest)
request).data(), request.version());
+ case CREATE_TOPICS:
+ return
CreateTopicsRequestDataJsonConverter.write(((CreateTopicsRequest)
request).data(), request.version());
+ case DELETE_ACLS:
+ return
DeleteAclsRequestDataJsonConverter.write(((DeleteAclsRequest) request).data(),
request.version());
+ case DELETE_GROUPS:
+ return
DeleteGroupsRequestDataJsonConverter.write(((DeleteGroupsRequest)
request).data(), request.version());
+ case DELETE_RECORDS:
+ return
DeleteRecordsRequestDataJsonConverter.write(((DeleteRecordsRequest)
request).data(), request.version());
+ case DELETE_SHARE_GROUP_STATE:
+ return
DeleteShareGroupStateRequestDataJsonConverter.write(((DeleteShareGroupStateRequest)
request).data(), request.version());
+ case DELETE_TOPICS:
+ return
DeleteTopicsRequestDataJsonConverter.write(((DeleteTopicsRequest)
request).data(), request.version());
+ case DESCRIBE_ACLS:
+ return
DescribeAclsRequestDataJsonConverter.write(((DescribeAclsRequest)
request).data(), request.version());
+ case DESCRIBE_CLIENT_QUOTAS:
+ return
DescribeClientQuotasRequestDataJsonConverter.write(((DescribeClientQuotasRequest)
request).data(), request.version());
+ case DESCRIBE_CLUSTER:
+ return
DescribeClusterRequestDataJsonConverter.write(((DescribeClusterRequest)
request).data(), request.version());
+ case DESCRIBE_CONFIGS:
+ return
DescribeConfigsRequestDataJsonConverter.write(((DescribeConfigsRequest)
request).data(), request.version());
+ case DESCRIBE_DELEGATION_TOKEN:
+ return
DescribeDelegationTokenRequestDataJsonConverter.write(((DescribeDelegationTokenRequest)
request).data(), request.version());
+ case DESCRIBE_GROUPS:
+ return
DescribeGroupsRequestDataJsonConverter.write(((DescribeGroupsRequest)
request).data(), request.version());
+ case DESCRIBE_LOG_DIRS:
+ return
DescribeLogDirsRequestDataJsonConverter.write(((DescribeLogDirsRequest)
request).data(), request.version());
+ case DESCRIBE_PRODUCERS:
+ return
DescribeProducersRequestDataJsonConverter.write(((DescribeProducersRequest)
request).data(), request.version());
+ case DESCRIBE_QUORUM:
+ return
DescribeQuorumRequestDataJsonConverter.write(((DescribeQuorumRequest)
request).data(), request.version());
+ case DESCRIBE_TOPIC_PARTITIONS:
+ return
DescribeTopicPartitionsRequestDataJsonConverter.write(((DescribeTopicPartitionsRequest)
request).data(), request.version());
+ case DESCRIBE_TRANSACTIONS:
+ return
DescribeTransactionsRequestDataJsonConverter.write(((DescribeTransactionsRequest)
request).data(), request.version());
+ case DESCRIBE_USER_SCRAM_CREDENTIALS:
+ return
DescribeUserScramCredentialsRequestDataJsonConverter.write(((DescribeUserScramCredentialsRequest)
request).data(), request.version());
+ case ELECT_LEADERS:
+ return
ElectLeadersRequestDataJsonConverter.write(((ElectLeadersRequest)
request).data(), request.version());
+ case END_QUORUM_EPOCH:
+ return
EndQuorumEpochRequestDataJsonConverter.write(((EndQuorumEpochRequest)
request).data(), request.version());
+ case END_TXN:
+ return EndTxnRequestDataJsonConverter.write(((EndTxnRequest)
request).data(), request.version());
+ case ENVELOPE:
+ return
EnvelopeRequestDataJsonConverter.write(((EnvelopeRequest) request).data(),
request.version());
+ case EXPIRE_DELEGATION_TOKEN:
+ return
ExpireDelegationTokenRequestDataJsonConverter.write(((ExpireDelegationTokenRequest)
request).data(), request.version());
+ case FETCH:
+ return FetchRequestDataJsonConverter.write(((FetchRequest)
request).data(), request.version());
+ case FETCH_SNAPSHOT:
+ return
FetchSnapshotRequestDataJsonConverter.write(((FetchSnapshotRequest)
request).data(), request.version());
+ case FIND_COORDINATOR:
+ return
FindCoordinatorRequestDataJsonConverter.write(((FindCoordinatorRequest)
request).data(), request.version());
+ case GET_TELEMETRY_SUBSCRIPTIONS:
+ return
GetTelemetrySubscriptionsRequestDataJsonConverter.write(((GetTelemetrySubscriptionsRequest)
request).data(), request.version());
+ case HEARTBEAT:
+ return
HeartbeatRequestDataJsonConverter.write(((HeartbeatRequest) request).data(),
request.version());
+ case INCREMENTAL_ALTER_CONFIGS:
+ return
IncrementalAlterConfigsRequestDataJsonConverter.write(((IncrementalAlterConfigsRequest)
request).data(), request.version());
+ case INITIALIZE_SHARE_GROUP_STATE:
+ return
InitializeShareGroupStateRequestDataJsonConverter.write(((InitializeShareGroupStateRequest)
request).data(), request.version());
+ case INIT_PRODUCER_ID:
+ return
InitProducerIdRequestDataJsonConverter.write(((InitProducerIdRequest)
request).data(), request.version());
+ case JOIN_GROUP:
+ return
JoinGroupRequestDataJsonConverter.write(((JoinGroupRequest) request).data(),
request.version());
+ case LEADER_AND_ISR:
+ return
LeaderAndIsrRequestDataJsonConverter.write(((LeaderAndIsrRequest)
request).data(), request.version());
+ case LEAVE_GROUP:
+ return
LeaveGroupRequestDataJsonConverter.write(((LeaveGroupRequest) request).data(),
request.version());
+ case LIST_CLIENT_METRICS_RESOURCES:
+ return
ListClientMetricsResourcesRequestDataJsonConverter.write(((ListClientMetricsResourcesRequest)
request).data(), request.version());
+ case LIST_GROUPS:
+ return
ListGroupsRequestDataJsonConverter.write(((ListGroupsRequest) request).data(),
request.version());
+ case LIST_OFFSETS:
+ return
ListOffsetsRequestDataJsonConverter.write(((ListOffsetsRequest)
request).data(), request.version());
+ case LIST_PARTITION_REASSIGNMENTS:
+ return
ListPartitionReassignmentsRequestDataJsonConverter.write(((ListPartitionReassignmentsRequest)
request).data(), request.version());
+ case LIST_TRANSACTIONS:
+ return
ListTransactionsRequestDataJsonConverter.write(((ListTransactionsRequest)
request).data(), request.version());
+ case METADATA:
+ return
MetadataRequestDataJsonConverter.write(((MetadataRequest) request).data(),
request.version());
+ case OFFSET_COMMIT:
+ return
OffsetCommitRequestDataJsonConverter.write(((OffsetCommitRequest)
request).data(), request.version());
+ case OFFSET_DELETE:
+ return
OffsetDeleteRequestDataJsonConverter.write(((OffsetDeleteRequest)
request).data(), request.version());
+ case OFFSET_FETCH:
+ return
OffsetFetchRequestDataJsonConverter.write(((OffsetFetchRequest)
request).data(), request.version());
+ case OFFSET_FOR_LEADER_EPOCH:
+ return
OffsetForLeaderEpochRequestDataJsonConverter.write(((OffsetsForLeaderEpochRequest)
request).data(), request.version());
+ case PRODUCE:
+ return ProduceRequestDataJsonConverter.write(((ProduceRequest)
request).data(), request.version(), false);
+ case PUSH_TELEMETRY:
+ return
PushTelemetryRequestDataJsonConverter.write(((PushTelemetryRequest)
request).data(), request.version());
+ case READ_SHARE_GROUP_STATE:
+ return
ReadShareGroupStateRequestDataJsonConverter.write(((ReadShareGroupStateRequest)
request).data(), request.version());
+ case READ_SHARE_GROUP_STATE_SUMMARY:
+ return
ReadShareGroupStateSummaryRequestDataJsonConverter.write(((ReadShareGroupStateSummaryRequest)
request).data(), request.version());
+ case RENEW_DELEGATION_TOKEN:
+ return
RenewDelegationTokenRequestDataJsonConverter.write(((RenewDelegationTokenRequest)
request).data(), request.version());
+ case SASL_AUTHENTICATE:
+ return
SaslAuthenticateRequestDataJsonConverter.write(((SaslAuthenticateRequest)
request).data(), request.version());
+ case SASL_HANDSHAKE:
+ return
SaslHandshakeRequestDataJsonConverter.write(((SaslHandshakeRequest)
request).data(), request.version());
+ case SHARE_ACKNOWLEDGE:
+ return
ShareAcknowledgeRequestDataJsonConverter.write(((ShareAcknowledgeRequest)
request).data(), request.version());
+ case SHARE_FETCH:
+ return
ShareFetchRequestDataJsonConverter.write(((ShareFetchRequest) request).data(),
request.version());
+ case SHARE_GROUP_DESCRIBE:
+ return
ShareGroupDescribeRequestDataJsonConverter.write(((ShareGroupDescribeRequest)
request).data(), request.version());
+ case SHARE_GROUP_HEARTBEAT:
+ return
ShareGroupHeartbeatRequestDataJsonConverter.write(((ShareGroupHeartbeatRequest)
request).data(), request.version());
+ case STOP_REPLICA:
+ return
StopReplicaRequestDataJsonConverter.write(((StopReplicaRequest)
request).data(), request.version());
+ case SYNC_GROUP:
+ return
SyncGroupRequestDataJsonConverter.write(((SyncGroupRequest) request).data(),
request.version());
+ case TXN_OFFSET_COMMIT:
+ return
TxnOffsetCommitRequestDataJsonConverter.write(((TxnOffsetCommitRequest)
request).data(), request.version());
+ case UNREGISTER_BROKER:
+ return
UnregisterBrokerRequestDataJsonConverter.write(((UnregisterBrokerRequest)
request).data(), request.version());
+ case UPDATE_FEATURES:
+ return
UpdateFeaturesRequestDataJsonConverter.write(((UpdateFeaturesRequest)
request).data(), request.version());
+ case UPDATE_METADATA:
+ return
UpdateMetadataRequestDataJsonConverter.write(((UpdateMetadataRequest)
request).data(), request.version());
+ case VOTE:
+ return VoteRequestDataJsonConverter.write(((VoteRequest)
request).data(), request.version());
+ case WRITE_SHARE_GROUP_STATE:
+ return
WriteShareGroupStateRequestDataJsonConverter.write(((WriteShareGroupStateRequest)
request).data(), request.version());
+ case WRITE_TXN_MARKERS:
+ return
WriteTxnMarkersRequestDataJsonConverter.write(((WriteTxnMarkersRequest)
request).data(), request.version());
+ case ADD_RAFT_VOTER:
+ return
AddRaftVoterRequestDataJsonConverter.write(((AddRaftVoterRequest)
request).data(), request.version());
+ case REMOVE_RAFT_VOTER:
+ return
RemoveRaftVoterRequestDataJsonConverter.write(((RemoveRaftVoterRequest)
request).data(), request.version());
+ case UPDATE_RAFT_VOTER:
+ return
UpdateRaftVoterRequestDataJsonConverter.write(((UpdateRaftVoterRequest)
request).data(), request.version());
+ default:
+ throw new IllegalStateException("ApiKey " + request.apiKey() +
" is not currently handled in `request`, the " +
+ "code should be updated to do so.");
+ }
+ }
+
+ public static JsonNode response(AbstractResponse response, short version) {
+ switch (response.apiKey()) {
+ case ADD_OFFSETS_TO_TXN:
+ return
AddOffsetsToTxnResponseDataJsonConverter.write(((AddOffsetsToTxnResponse)
response).data(), version);
+ case ADD_PARTITIONS_TO_TXN:
+ return
AddPartitionsToTxnResponseDataJsonConverter.write(((AddPartitionsToTxnResponse)
response).data(), version);
+ case ALLOCATE_PRODUCER_IDS:
+ return
AllocateProducerIdsResponseDataJsonConverter.write(((AllocateProducerIdsResponse)
response).data(), version);
+ case ALTER_CLIENT_QUOTAS:
+ return
AlterClientQuotasResponseDataJsonConverter.write(((AlterClientQuotasResponse)
response).data(), version);
+ case ALTER_CONFIGS:
+ return
AlterConfigsResponseDataJsonConverter.write(((AlterConfigsResponse)
response).data(), version);
+ case ALTER_PARTITION_REASSIGNMENTS:
+ return
AlterPartitionReassignmentsResponseDataJsonConverter.write(((AlterPartitionReassignmentsResponse)
response).data(), version);
+ case ALTER_PARTITION:
+ return
AlterPartitionResponseDataJsonConverter.write(((AlterPartitionResponse)
response).data(), version);
+ case ALTER_REPLICA_LOG_DIRS:
+ return
AlterReplicaLogDirsResponseDataJsonConverter.write(((AlterReplicaLogDirsResponse)
response).data(), version);
+ case ALTER_USER_SCRAM_CREDENTIALS:
+ return
AlterUserScramCredentialsResponseDataJsonConverter.write(((AlterUserScramCredentialsResponse)
response).data(), version);
+ case API_VERSIONS:
+ return
ApiVersionsResponseDataJsonConverter.write(((ApiVersionsResponse)
response).data(), version);
+ case ASSIGN_REPLICAS_TO_DIRS:
+ return
AssignReplicasToDirsResponseDataJsonConverter.write(((AssignReplicasToDirsResponse)
response).data(), version);
+ case BEGIN_QUORUM_EPOCH:
+ return
BeginQuorumEpochResponseDataJsonConverter.write(((BeginQuorumEpochResponse)
response).data(), version);
+ case BROKER_HEARTBEAT:
+ return
BrokerHeartbeatResponseDataJsonConverter.write(((BrokerHeartbeatResponse)
response).data(), version);
+ case BROKER_REGISTRATION:
+ return
BrokerRegistrationResponseDataJsonConverter.write(((BrokerRegistrationResponse)
response).data(), version);
+ case CONSUMER_GROUP_DESCRIBE:
+ return
ConsumerGroupDescribeResponseDataJsonConverter.write(((ConsumerGroupDescribeResponse)
response).data(), version);
+ case CONSUMER_GROUP_HEARTBEAT:
+ return
ConsumerGroupHeartbeatResponseDataJsonConverter.write(((ConsumerGroupHeartbeatResponse)
response).data(), version);
+ case CONTROLLED_SHUTDOWN:
+ return
ControlledShutdownResponseDataJsonConverter.write(((ControlledShutdownResponse)
response).data(), version);
+ case CONTROLLER_REGISTRATION:
+ return
ControllerRegistrationResponseDataJsonConverter.write(((ControllerRegistrationResponse)
response).data(), version);
+ case CREATE_ACLS:
+ return
CreateAclsResponseDataJsonConverter.write(((CreateAclsResponse)
response).data(), version);
+ case CREATE_DELEGATION_TOKEN:
+ return
CreateDelegationTokenResponseDataJsonConverter.write(((CreateDelegationTokenResponse)
response).data(), version);
+ case CREATE_PARTITIONS:
+ return
CreatePartitionsResponseDataJsonConverter.write(((CreatePartitionsResponse)
response).data(), version);
+ case CREATE_TOPICS:
+ return
CreateTopicsResponseDataJsonConverter.write(((CreateTopicsResponse)
response).data(), version);
+ case DELETE_ACLS:
+ return
DeleteAclsResponseDataJsonConverter.write(((DeleteAclsResponse)
response).data(), version);
+ case DELETE_GROUPS:
+ return
DeleteGroupsResponseDataJsonConverter.write(((DeleteGroupsResponse)
response).data(), version);
+ case DELETE_RECORDS:
+ return
DeleteRecordsResponseDataJsonConverter.write(((DeleteRecordsResponse)
response).data(), version);
+ case DELETE_SHARE_GROUP_STATE:
+ return
DeleteShareGroupStateResponseDataJsonConverter.write(((DeleteShareGroupStateResponse)
response).data(), version);
+ case DELETE_TOPICS:
+ return
DeleteTopicsResponseDataJsonConverter.write(((DeleteTopicsResponse)
response).data(), version);
+ case DESCRIBE_ACLS:
+ return
DescribeAclsResponseDataJsonConverter.write(((DescribeAclsResponse)
response).data(), version);
+ case DESCRIBE_CLIENT_QUOTAS:
+ return
DescribeClientQuotasResponseDataJsonConverter.write(((DescribeClientQuotasResponse)
response).data(), version);
+ case DESCRIBE_CLUSTER:
+ return
DescribeClusterResponseDataJsonConverter.write(((DescribeClusterResponse)
response).data(), version);
+ case DESCRIBE_CONFIGS:
+ return
DescribeConfigsResponseDataJsonConverter.write(((DescribeConfigsResponse)
response).data(), version);
+ case DESCRIBE_DELEGATION_TOKEN:
+ return
DescribeDelegationTokenResponseDataJsonConverter.write(((DescribeDelegationTokenResponse)
response).data(), version);
+ case DESCRIBE_GROUPS:
+ return
DescribeGroupsResponseDataJsonConverter.write(((DescribeGroupsResponse)
response).data(), version);
+ case DESCRIBE_LOG_DIRS:
+ return
DescribeLogDirsResponseDataJsonConverter.write(((DescribeLogDirsResponse)
response).data(), version);
+ case DESCRIBE_PRODUCERS:
+ return
DescribeProducersResponseDataJsonConverter.write(((DescribeProducersResponse)
response).data(), version);
+ case DESCRIBE_QUORUM:
+ return
DescribeQuorumResponseDataJsonConverter.write(((DescribeQuorumResponse)
response).data(), version);
+ case DESCRIBE_TOPIC_PARTITIONS:
+ return
DescribeTopicPartitionsResponseDataJsonConverter.write(((DescribeTopicPartitionsResponse)
response).data(), version);
+ case DESCRIBE_TRANSACTIONS:
+ return
DescribeTransactionsResponseDataJsonConverter.write(((DescribeTransactionsResponse)
response).data(), version);
+ case DESCRIBE_USER_SCRAM_CREDENTIALS:
+ return
DescribeUserScramCredentialsResponseDataJsonConverter.write(((DescribeUserScramCredentialsResponse)
response).data(), version);
+ case ELECT_LEADERS:
+ return
ElectLeadersResponseDataJsonConverter.write(((ElectLeadersResponse)
response).data(), version);
+ case END_QUORUM_EPOCH:
+ return
EndQuorumEpochResponseDataJsonConverter.write(((EndQuorumEpochResponse)
response).data(), version);
+ case END_TXN:
+ return EndTxnResponseDataJsonConverter.write(((EndTxnResponse)
response).data(), version);
+ case ENVELOPE:
+ return
EnvelopeResponseDataJsonConverter.write(((EnvelopeResponse) response).data(),
version);
+ case EXPIRE_DELEGATION_TOKEN:
+ return
ExpireDelegationTokenResponseDataJsonConverter.write(((ExpireDelegationTokenResponse)
response).data(), version);
+ case FETCH:
+ return FetchResponseDataJsonConverter.write(((FetchResponse)
response).data(), version, false);
+ case FETCH_SNAPSHOT:
+ return
FetchSnapshotResponseDataJsonConverter.write(((FetchSnapshotResponse)
response).data(), version);
+ case FIND_COORDINATOR:
+ return
FindCoordinatorResponseDataJsonConverter.write(((FindCoordinatorResponse)
response).data(), version);
+ case GET_TELEMETRY_SUBSCRIPTIONS:
+ return
GetTelemetrySubscriptionsResponseDataJsonConverter.write(((GetTelemetrySubscriptionsResponse)
response).data(), version);
+ case HEARTBEAT:
+ return
HeartbeatResponseDataJsonConverter.write(((HeartbeatResponse) response).data(),
version);
+ case INCREMENTAL_ALTER_CONFIGS:
+ return
IncrementalAlterConfigsResponseDataJsonConverter.write(((IncrementalAlterConfigsResponse)
response).data(), version);
+ case INITIALIZE_SHARE_GROUP_STATE:
+ return
InitializeShareGroupStateResponseDataJsonConverter.write(((InitializeShareGroupStateResponse)
response).data(), version);
+ case INIT_PRODUCER_ID:
+ return
InitProducerIdResponseDataJsonConverter.write(((InitProducerIdResponse)
response).data(), version);
+ case JOIN_GROUP:
+ return
JoinGroupResponseDataJsonConverter.write(((JoinGroupResponse) response).data(),
version);
+ case LEADER_AND_ISR:
+ return
LeaderAndIsrResponseDataJsonConverter.write(((LeaderAndIsrResponse)
response).data(), version);
+ case LEAVE_GROUP:
+ return
LeaveGroupResponseDataJsonConverter.write(((LeaveGroupResponse)
response).data(), version);
+ case LIST_CLIENT_METRICS_RESOURCES:
+ return
ListClientMetricsResourcesResponseDataJsonConverter.write(((ListClientMetricsResourcesResponse)
response).data(), version);
+ case LIST_GROUPS:
+ return
ListGroupsResponseDataJsonConverter.write(((ListGroupsResponse)
response).data(), version);
+ case LIST_OFFSETS:
+ return
ListOffsetsResponseDataJsonConverter.write(((ListOffsetsResponse)
response).data(), version);
+ case LIST_PARTITION_REASSIGNMENTS:
+ return
ListPartitionReassignmentsResponseDataJsonConverter.write(((ListPartitionReassignmentsResponse)
response).data(), version);
+ case LIST_TRANSACTIONS:
+ return
ListTransactionsResponseDataJsonConverter.write(((ListTransactionsResponse)
response).data(), version);
+ case METADATA:
+ return
MetadataResponseDataJsonConverter.write(((MetadataResponse) response).data(),
version);
+ case OFFSET_COMMIT:
+ return
OffsetCommitResponseDataJsonConverter.write(((OffsetCommitResponse)
response).data(), version);
+ case OFFSET_DELETE:
+ return
OffsetDeleteResponseDataJsonConverter.write(((OffsetDeleteResponse)
response).data(), version);
+ case OFFSET_FETCH:
+ return
OffsetFetchResponseDataJsonConverter.write(((OffsetFetchResponse)
response).data(), version);
+ case OFFSET_FOR_LEADER_EPOCH:
+ return
OffsetForLeaderEpochResponseDataJsonConverter.write(((OffsetsForLeaderEpochResponse)
response).data(), version);
+ case PRODUCE:
+ return
ProduceResponseDataJsonConverter.write(((ProduceResponse) response).data(),
version);
+ case PUSH_TELEMETRY:
+ return
PushTelemetryResponseDataJsonConverter.write(((PushTelemetryResponse)
response).data(), version);
+ case READ_SHARE_GROUP_STATE:
+ return
ReadShareGroupStateResponseDataJsonConverter.write(((ReadShareGroupStateResponse)
response).data(), version);
+ case READ_SHARE_GROUP_STATE_SUMMARY:
+ return
ReadShareGroupStateSummaryResponseDataJsonConverter.write(((ReadShareGroupStateSummaryResponse)
response).data(), version);
+ case RENEW_DELEGATION_TOKEN:
+ return
RenewDelegationTokenResponseDataJsonConverter.write(((RenewDelegationTokenResponse)
response).data(), version);
+ case SASL_AUTHENTICATE:
+ return
SaslAuthenticateResponseDataJsonConverter.write(((SaslAuthenticateResponse)
response).data(), version);
+ case SASL_HANDSHAKE:
+ return
SaslHandshakeResponseDataJsonConverter.write(((SaslHandshakeResponse)
response).data(), version);
+ case SHARE_ACKNOWLEDGE:
+ return
ShareAcknowledgeResponseDataJsonConverter.write(((ShareAcknowledgeResponse)
response).data(), version);
+ case SHARE_FETCH:
+ return
ShareFetchResponseDataJsonConverter.write(((ShareFetchResponse)
response).data(), version);
+ case SHARE_GROUP_DESCRIBE:
+ return
ShareGroupDescribeResponseDataJsonConverter.write(((ShareGroupDescribeResponse)
response).data(), version);
+ case SHARE_GROUP_HEARTBEAT:
+ return
ShareGroupHeartbeatResponseDataJsonConverter.write(((ShareGroupHeartbeatResponse)
response).data(), version);
+ case STOP_REPLICA:
+ return
StopReplicaResponseDataJsonConverter.write(((StopReplicaResponse)
response).data(), version);
+ case SYNC_GROUP:
+ return
SyncGroupResponseDataJsonConverter.write(((SyncGroupResponse) response).data(),
version);
+ case TXN_OFFSET_COMMIT:
+ return
TxnOffsetCommitResponseDataJsonConverter.write(((TxnOffsetCommitResponse)
response).data(), version);
+ case UNREGISTER_BROKER:
+ return
UnregisterBrokerResponseDataJsonConverter.write(((UnregisterBrokerResponse)
response).data(), version);
+ case UPDATE_FEATURES:
+ return
UpdateFeaturesResponseDataJsonConverter.write(((UpdateFeaturesResponse)
response).data(), version);
+ case UPDATE_METADATA:
+ return
UpdateMetadataResponseDataJsonConverter.write(((UpdateMetadataResponse)
response).data(), version);
+ case VOTE:
+ return VoteResponseDataJsonConverter.write(((VoteResponse)
response).data(), version);
+ case WRITE_SHARE_GROUP_STATE:
+ return
WriteShareGroupStateResponseDataJsonConverter.write(((WriteShareGroupStateResponse)
response).data(), version);
+ case WRITE_TXN_MARKERS:
+ return
WriteTxnMarkersResponseDataJsonConverter.write(((WriteTxnMarkersResponse)
response).data(), version);
+ case ADD_RAFT_VOTER:
+ return
AddRaftVoterResponseDataJsonConverter.write(((AddRaftVoterResponse)
response).data(), version);
+ case REMOVE_RAFT_VOTER:
+ return
RemoveRaftVoterResponseDataJsonConverter.write(((RemoveRaftVoterResponse)
response).data(), version);
+ case UPDATE_RAFT_VOTER:
+ return
UpdateRaftVoterResponseDataJsonConverter.write(((UpdateRaftVoterResponse)
response).data(), version);
+ default:
+ throw new IllegalStateException("ApiKey " + response.apiKey()
+ " is not currently handled in `response`, the " +
+ "code should be updated to do so.");
+ }
+ }
+
+ public static JsonNode requestHeaderNode(RequestHeader header) {
+ ObjectNode node = (ObjectNode) RequestHeaderDataJsonConverter.write(
+ header.data(), header.headerVersion(), false
+ );
+ node.set("requestApiKeyName", new
TextNode(header.apiKey().toString()));
+ if (header.apiKey().isVersionDeprecated(header.apiVersion())) {
+ node.set("requestApiVersionDeprecated", BooleanNode.TRUE);
+ }
+ return node;
+ }
+
+ public static JsonNode requestDesc(RequestHeader header,
Optional<JsonNode> requestNode, boolean isForwarded) {
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.set("isForwarded", isForwarded ? BooleanNode.TRUE :
BooleanNode.FALSE);
+ node.set("requestHeader", requestHeaderNode(header));
+ node.set("request", requestNode.orElse(new TextNode("")));
+ return node;
+ }
+
+ public static JsonNode clientInfoNode(ClientInformation clientInfo) {
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+ node.set("softwareName", new TextNode(clientInfo.softwareName()));
+ node.set("softwareVersion", new
TextNode(clientInfo.softwareVersion()));
+ return node;
+ }
+
+ public static JsonNode requestDescMetrics(RequestHeader header,
Optional<JsonNode> requestNode, Optional<JsonNode> responseNode,
+ RequestContext context, Session
session, boolean isForwarded,
+ double totalTimeMs, double
requestQueueTimeMs, double apiLocalTimeMs,
+ double apiRemoteTimeMs, long
apiThrottleTimeMs, double responseQueueTimeMs,
+ double responseSendTimeMs, long
temporaryMemoryBytes,
+ double messageConversionsTimeMs)
{
+ ObjectNode node = (ObjectNode) requestDesc(header, requestNode,
isForwarded);
+ node.set("response", responseNode.orElse(new TextNode("")));
+ node.set("connection", new TextNode(context.connectionId));
+ node.set("totalTimeMs", new DoubleNode(totalTimeMs));
+ node.set("requestQueueTimeMs", new DoubleNode(requestQueueTimeMs));
+ node.set("localTimeMs", new DoubleNode(apiLocalTimeMs));
+ node.set("remoteTimeMs", new DoubleNode(apiRemoteTimeMs));
+ node.set("throttleTimeMs", new LongNode(apiThrottleTimeMs));
+ node.set("responseQueueTimeMs", new DoubleNode(responseQueueTimeMs));
+ node.set("sendTimeMs", new DoubleNode(responseSendTimeMs));
+ node.set("securityProtocol", new
TextNode(context.securityProtocol.toString()));
+ node.set("principal", new TextNode(session.principal.toString()));
+ node.set("listener", new TextNode(context.listenerName.value()));
+ node.set("clientInformation",
clientInfoNode(context.clientInformation));
+ if (temporaryMemoryBytes > 0) {
+ node.set("temporaryMemoryBytes", new
LongNode(temporaryMemoryBytes));
+ }
+ if (messageConversionsTimeMs > 0) {
+ node.set("messageConversionsTime", new
DoubleNode(messageConversionsTimeMs));
+ }
+ return node;
+ }
+}
diff --git
a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
new file mode 100644
index 00000000000..56c5a822432
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.DescribeAclsRequestData;
+import org.apache.kafka.common.message.DescribeLogDirsResponseData;
+import org.apache.kafka.common.network.ClientInformation;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.MessageUtil;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class RequestConvertToJsonTest {
+
+ @Test
+ public void testAllRequestTypesHandled() {
+ List<String> unhandledKeys = new ArrayList<>();
+ for (ApiKeys key : ApiKeys.values()) {
+ short version = key.latestVersion();
+ ApiMessage message;
+ if (key == ApiKeys.DESCRIBE_ACLS) {
+ message = ApiMessageType.fromApiKey(key.id).newRequest();
+ DescribeAclsRequestData requestData =
(DescribeAclsRequestData) message;
+ requestData.setPatternTypeFilter((byte) 1);
+ requestData.setResourceTypeFilter((byte) 1);
+ requestData.setPermissionType((byte) 1);
+ requestData.setOperation((byte) 1);
+ } else {
+ message = ApiMessageType.fromApiKey(key.id).newRequest();
+ }
+ ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+ AbstractRequest req = AbstractRequest.parseRequest(key, version,
bytes).request;
+ try {
+ RequestConvertToJson.request(req);
+ } catch (IllegalStateException e) {
+ unhandledKeys.add(key.toString());
+ }
+ }
+ assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled
request keys");
+ }
+
+ @Test
+ public void testAllApiVersionsResponseHandled() {
+ for (ApiKeys key : ApiKeys.values()) {
+ List<Short> unhandledVersions = new ArrayList<>();
+ for (short version : key.allVersions()) {
+ ApiMessage message;
+ // Specify top-level error handling for verifying
compatibility across versions
+ if (key == ApiKeys.DESCRIBE_LOG_DIRS) {
+ message = ApiMessageType.fromApiKey(key.id).newResponse();
+ DescribeLogDirsResponseData responseData =
(DescribeLogDirsResponseData) message;
+
responseData.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+ } else {
+ message = ApiMessageType.fromApiKey(key.id).newResponse();
+ }
+
+ ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+ AbstractResponse response =
AbstractResponse.parseResponse(key, bytes, version);
+ try {
+ RequestConvertToJson.response(response, version);
+ } catch (IllegalStateException e) {
+ unhandledVersions.add(version);
+ }
+ }
+ assertEquals(new ArrayList<>(), unhandledVersions, "API: " + key +
" - Unhandled request versions");
+ }
+ }
+
+ @Test
+ public void testAllResponseTypesHandled() {
+ List<String> unhandledKeys = new ArrayList<>();
+ for (ApiKeys key : ApiKeys.values()) {
+ short version = key.latestVersion();
+ ApiMessage message =
ApiMessageType.fromApiKey(key.id).newResponse();
+ ByteBuffer bytes = MessageUtil.toByteBuffer(message, version);
+ AbstractResponse res = AbstractResponse.parseResponse(key, bytes,
version);
+ try {
+ RequestConvertToJson.response(res, version);
+ } catch (IllegalStateException e) {
+ unhandledKeys.add(key.toString());
+ }
+ }
+ assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled
response keys");
+ }
+
+ @Test
+ public void testClientInfoNode() {
+ ClientInformation clientInfo = new ClientInformation("name", "1");
+ ObjectNode expectedNode = JsonNodeFactory.instance.objectNode();
+ expectedNode.set("softwareName", new
TextNode(clientInfo.softwareName()));
+ expectedNode.set("softwareVersion", new
TextNode(clientInfo.softwareVersion()));
+ JsonNode actualNode = RequestConvertToJson.clientInfoNode(clientInfo);
+ assertEquals(expectedNode, actualNode);
+ }
+}