Hi Team,

 

Greetings,

 

Apologies for the delay in reply as I was down with flu.

 

We actually reached out to you for IT/ SAP/ Oracle/ Infor / Microsoft “VOTEC IT 
SERVICE PARTNERSHIP”  “IT SERVICE OUTSOURCING” “ “PARTNER SERVICE 
SUBCONTRACTING”

 

We have very attractive newly introduce reasonably price PARTNER IT SERVICE ODC 
SUBCONTRACTING MODEL in USA, Philippines, India and Singapore etc with White 
Label Model.

 

Our LOW COST IT SERVICE ODC MODEL eliminate the cost of expensive employee 
payroll, Help partner to get profit more than 50% on each project.. ..We really 
mean it.

 

We are already working with platinum partner like NTT DATA, NEC Singapore, 
Deloitte, Hitachi consulting. ACCENTURE, Abeam Singapore etc.

 

Are u keen to understand VOTEC IT SERVICE MODEL PARTNERSHIP offerings?

 

Let us know your availability this week OR Next week?? We can arrange 
discussion with Partner Manager.
> On 01/25/2024 4:16 AM +08 davidart...@apache.org wrote:
> 
>  
> This is an automated email from the ASF dual-hosted git repository.
> 
> davidarthur pushed a commit to branch trunk
> in repository https://gitbox.apache.org/repos/asf/kafka.git
> 
> 
> The following commit(s) were added to refs/heads/trunk by this push:
>      new 7e5ef9b509a KAFKA-15585: Implement DescribeTopicPartitions RPC on 
> broker (#14612)
> 7e5ef9b509a is described below
> 
> commit 7e5ef9b509a00973d271def5f70ca2062208e778
> Author: Calvin Liu <83986057+calvinconflu...@users.noreply.github.com>
> AuthorDate: Wed Jan 24 12:16:09 2024 -0800
> 
>     KAFKA-15585: Implement DescribeTopicPartitions RPC on broker (#14612)
>     
>     This patch implements the new DescribeTopicPartitions RPC as defined in 
> KIP-966 (ELR). Additionally, this patch adds a broker config 
> "max.request.partition.size.limit" which limits the number of partitions 
> returned by the new RPC.
>     
>     Reviewers: Artem Livshits <alivsh...@confluent.io>, Jason Gustafson 
> <ja...@confluent.io>, David Arthur <mum...@gmail.com>
> ---
>  checkstyle/suppressions.xml                        |   3 +
>  .../org/apache/kafka/common/protocol/ApiKeys.java  |   3 +-
>  .../kafka/common/requests/AbstractRequest.java     |   2 +
>  .../kafka/common/requests/AbstractResponse.java    |   2 +
>  .../requests/DescribeTopicPartitionsRequest.java   |  99 ++++
>  .../requests/DescribeTopicPartitionsResponse.java  |  83 ++++
>  .../message/DescribeTopicPartitionsRequest.json    |  40 ++
>  .../message/DescribeTopicPartitionsResponse.json   |  66 +++
>  .../kafka/common/requests/RequestResponseTest.java |  39 ++
>  .../DescribeTopicPartitionsRequestHandler.java     | 130 +++++
>  .../scala/kafka/network/RequestConvertToJson.scala |   4 +-
>  core/src/main/scala/kafka/server/KafkaApis.scala   |  32 +-
>  core/src/main/scala/kafka/server/KafkaConfig.scala |  12 +
>  .../kafka/server/metadata/KRaftMetadataCache.scala | 167 ++++++-
>  .../DescribeTopicPartitionsRequestHandlerTest.java | 545 
> +++++++++++++++++++++
>  .../scala/unit/kafka/server/KafkaApisTest.scala    |  59 +--
>  .../unit/kafka/server/MetadataCacheTest.scala      | 183 ++++++-
>  .../scala/unit/kafka/server/RequestQuotaTest.scala |   3 +
>  .../kafka/controller/PartitionChangeBuilder.java   |   8 +-
>  .../kafka/metadata/PartitionRegistration.java      |   6 +-
>  .../common/metadata/PartitionChangeRecord.json     |   6 +-
>  .../resources/common/metadata/PartitionRecord.json |   4 +-
>  .../controller/PartitionChangeBuilderTest.java     |  18 +-
>  .../kafka/metadata/PartitionRegistrationTest.java  |   2 +-
>  .../org/apache/kafka/server/config/Defaults.java   |   3 +
>  25 files changed, 1439 insertions(+), 80 deletions(-)
> 
> diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
> index dae818c79b0..7df07751acf 100644
> --- a/checkstyle/suppressions.xml
> +++ b/checkstyle/suppressions.xml
> @@ -44,6 +44,9 @@
>      <suppress checks="MethodLength"
>                files="(KafkaClusterTestKit).java"/>
>  
> +    <!-- server tests -->
> +    <suppress checks="MethodLength|JavaNCSS|NPath" 
> files="DescribeTopicPartitionsRequestHandlerTest.java"/>
> +
>      <!-- Clients -->
>      <suppress id="dontUseSystemExit"
>                files="Exit.java"/>
> diff --git 
> a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
> b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
> index ee0d773b070..16bec4fb72d 100644
> --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
> +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
> @@ -117,7 +117,8 @@ public enum ApiKeys {
>      GET_TELEMETRY_SUBSCRIPTIONS(ApiMessageType.GET_TELEMETRY_SUBSCRIPTIONS),
>      PUSH_TELEMETRY(ApiMessageType.PUSH_TELEMETRY),
>      ASSIGN_REPLICAS_TO_DIRS(ApiMessageType.ASSIGN_REPLICAS_TO_DIRS),
> -    
> LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES);
> +    
> LIST_CLIENT_METRICS_RESOURCES(ApiMessageType.LIST_CLIENT_METRICS_RESOURCES),
> +    DESCRIBE_TOPIC_PARTITIONS(ApiMessageType.DESCRIBE_TOPIC_PARTITIONS);
>  
>      private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> 
> APIS_BY_LISTENER =
>          new EnumMap<>(ApiMessageType.ListenerType.class);
> diff --git 
> a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
> b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
> index 23f67cb5273..b51221f5af6 100644
> --- 
> a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
> +++ 
> b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
> @@ -324,6 +324,8 @@ public abstract class AbstractRequest implements 
> AbstractRequestResponse {
>                  return AssignReplicasToDirsRequest.parse(buffer, apiVersion);
>              case LIST_CLIENT_METRICS_RESOURCES:
>                  return ListClientMetricsResourcesRequest.parse(buffer, 
> apiVersion);
> +            case DESCRIBE_TOPIC_PARTITIONS:
> +                return DescribeTopicPartitionsRequest.parse(buffer, 
> apiVersion);
>              default:
>                  throw new AssertionError(String.format("ApiKey %s is not 
> currently handled in `parseRequest`, the " +
>                          "code should be updated to do so.", apiKey));
> diff --git 
> a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
>  
> b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
> index f99da4e2119..dbafdbf3bcb 100644
> --- 
> a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
> +++ 
> b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
> @@ -261,6 +261,8 @@ public abstract class AbstractResponse implements 
> AbstractRequestResponse {
>                  return AssignReplicasToDirsResponse.parse(responseBuffer, 
> version);
>              case LIST_CLIENT_METRICS_RESOURCES:
>                  return 
> ListClientMetricsResourcesResponse.parse(responseBuffer, version);
> +            case DESCRIBE_TOPIC_PARTITIONS:
> +                return DescribeTopicPartitionsResponse.parse(responseBuffer, 
> version);
>              default:
>                  throw new AssertionError(String.format("ApiKey %s is not 
> currently handled in `parseResponse`, the " +
>                          "code should be updated to do so.", apiKey));
> diff --git 
> a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
>  
> b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
> new file mode 100644
> index 00000000000..588c562f1ed
> --- /dev/null
> +++ 
> b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
> @@ -0,0 +1,99 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.kafka.common.requests;
> +
> +import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
> +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
> +import org.apache.kafka.common.protocol.ApiKeys;
> +import org.apache.kafka.common.protocol.ByteBufferAccessor;
> +import org.apache.kafka.common.protocol.Errors;
> +
> +import java.nio.ByteBuffer;
> +import java.util.Collections;
> +import java.util.List;
> +
> +public class DescribeTopicPartitionsRequest extends AbstractRequest {
> +    public static class Builder extends 
> AbstractRequest.Builder<DescribeTopicPartitionsRequest> {
> +        private final DescribeTopicPartitionsRequestData data;
> +
> +        public Builder(DescribeTopicPartitionsRequestData data) {
> +            super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS);
> +            this.data = data;
> +        }
> +
> +        public Builder(List<String> topics) {
> +            super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, 
> ApiKeys.DESCRIBE_TOPIC_PARTITIONS.oldestVersion(),
> +                ApiKeys.DESCRIBE_TOPIC_PARTITIONS.latestVersion());
> +            DescribeTopicPartitionsRequestData data = new 
> DescribeTopicPartitionsRequestData();
> +            topics.forEach(topicName -> data.topics().add(
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(topicName))
> +            );
> +            this.data = data;
> +        }
> +
> +        @Override
> +        public DescribeTopicPartitionsRequest build(short version) {
> +            return new DescribeTopicPartitionsRequest(data, version);
> +        }
> +
> +        @Override
> +        public String toString() {
> +            return data.toString();
> +        }
> +
> +    }
> +
> +    private final DescribeTopicPartitionsRequestData data;
> +
> +    public DescribeTopicPartitionsRequest(DescribeTopicPartitionsRequestData 
> data) {
> +        super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, (short) 0);
> +        this.data = data;
> +    }
> +
> +    public DescribeTopicPartitionsRequest(DescribeTopicPartitionsRequestData 
> data, short version) {
> +        super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS, version);
> +        this.data = data;
> +    }
> +
> +    @Override
> +    public DescribeTopicPartitionsRequestData data() {
> +        return data;
> +    }
> +
> +    @Override
> +    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable 
> e) {
> +        Errors error = Errors.forException(e);
> +        DescribeTopicPartitionsResponseData responseData = new 
> DescribeTopicPartitionsResponseData();
> +        for (DescribeTopicPartitionsRequestData.TopicRequest topic : 
> data.topics()) {
> +            responseData.topics().add(new 
> DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic()
> +                .setName(topic.name())
> +                .setErrorCode(error.code())
> +                .setIsInternal(false)
> +                .setPartitions(Collections.emptyList())
> +            );
> +        }
> +        responseData.setThrottleTimeMs(throttleTimeMs);
> +        return new DescribeTopicPartitionsResponse(responseData);
> +    }
> +
> +    public static DescribeTopicPartitionsRequest parse(ByteBuffer buffer, 
> short version) {
> +        return new DescribeTopicPartitionsRequest(
> +            new DescribeTopicPartitionsRequestData(new 
> ByteBufferAccessor(buffer), version),
> +            version);
> +    }
> +}
> diff --git 
> a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
>  
> b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
> new file mode 100644
> index 00000000000..e92f03d6b73
> --- /dev/null
> +++ 
> b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsResponse.java
> @@ -0,0 +1,83 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.kafka.common.requests;
> +
> +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
> +import 
> org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
> +import org.apache.kafka.common.protocol.ApiKeys;
> +import org.apache.kafka.common.protocol.ByteBufferAccessor;
> +import org.apache.kafka.common.protocol.Errors;
> +
> +import java.nio.ByteBuffer;
> +import java.util.HashMap;
> +import java.util.List;
> +import java.util.Map;
> +
> +public class DescribeTopicPartitionsResponse extends AbstractResponse {
> +    private final DescribeTopicPartitionsResponseData data;
> +
> +    public 
> DescribeTopicPartitionsResponse(DescribeTopicPartitionsResponseData data) {
> +        super(ApiKeys.DESCRIBE_TOPIC_PARTITIONS);
> +        this.data = data;
> +    }
> +
> +    @Override
> +    public DescribeTopicPartitionsResponseData data() {
> +        return data;
> +    }
> +
> +    @Override
> +    public int throttleTimeMs() {
> +        return data.throttleTimeMs();
> +    }
> +
> +    @Override
> +    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
> +        data.setThrottleTimeMs(throttleTimeMs);
> +    }
> +
> +    @Override
> +    public boolean shouldClientThrottle(short version) {
> +        return true;
> +    }
> +
> +    @Override
> +    public Map<Errors, Integer> errorCounts() {
> +        Map<Errors, Integer> errorCounts = new HashMap<>();
> +        data.topics().forEach(topicResponse -> {
> +            topicResponse.partitions().forEach(p -> 
> updateErrorCounts(errorCounts, Errors.forCode(p.errorCode())));
> +            updateErrorCounts(errorCounts, 
> Errors.forCode(topicResponse.errorCode()));
> +        });
> +        return errorCounts;
> +    }
> +
> +    public static DescribeTopicPartitionsResponse prepareResponse(
> +        int throttleTimeMs,
> +        List<DescribeTopicPartitionsResponseTopic> topics
> +    ) {
> +        DescribeTopicPartitionsResponseData responseData = new 
> DescribeTopicPartitionsResponseData();
> +        responseData.setThrottleTimeMs(throttleTimeMs);
> +        topics.forEach(topicResponse -> 
> responseData.topics().add(topicResponse));
> +        return new DescribeTopicPartitionsResponse(responseData);
> +    }
> +
> +    public static DescribeTopicPartitionsResponse parse(ByteBuffer buffer, 
> short version) {
> +        return new DescribeTopicPartitionsResponse(
> +            new DescribeTopicPartitionsResponseData(new 
> ByteBufferAccessor(buffer), version));
> +    }
> +}
> diff --git 
> a/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json
>  
> b/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json
> new file mode 100644
> index 00000000000..63c5b5c32ad
> --- /dev/null
> +++ 
> b/clients/src/main/resources/common/message/DescribeTopicPartitionsRequest.json
> @@ -0,0 +1,40 @@
> +// Licensed to the Apache Software Foundation (ASF) under one or more
> +// contributor license agreements.  See the NOTICE file distributed with
> +// this work for additional information regarding copyright ownership.
> +// The ASF licenses this file to You under the Apache License, Version 2.0
> +// (the "License"); you may not use this file except in compliance with
> +// the License.  You may obtain a copy of the License at
> +//
> +//    http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License.
> +
> +{
> +  "apiKey": 75,
> +  "type": "request",
> +  "listeners": ["broker"],
> +  "name": "DescribeTopicPartitionsRequest",
> +  "validVersions": "0",
> +  "flexibleVersions": "0+",
> +  "fields": [
> +    { "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
> +      "about": "The topics to fetch details for.",
> +      "fields": [
> +        { "name": "Name", "type": "string", "versions": "0+",
> +          "about": "The topic name", "versions": "0+", "entityType": 
> "topicName"}
> +      ]
> +    },
> +    { "name": "ResponsePartitionLimit", "type": "int32", "versions": "0+", 
> "default": "2000",
> +      "about": "The maximum number of partitions included in the response." 
> },
> +    { "name": "Cursor", "type": "Cursor", "versions": "0+", 
> "nullableVersions": "0+", "default": "null",
> +      "about": "The first topic and partition index to fetch details for.", 
> "fields": [
> +      { "name": "TopicName", "type": "string", "versions": "0+",
> +        "about": "The name for the first topic to process", "versions": 
> "0+", "entityType": "topicName"},
> +      { "name": "PartitionIndex", "type": "int32", "versions": "0+", 
> "about": "The partition index to start with"}
> +    ]}
> +  ]
> +}
> diff --git 
> a/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json
>  
> b/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json
> new file mode 100644
> index 00000000000..e8eee7dcb64
> --- /dev/null
> +++ 
> b/clients/src/main/resources/common/message/DescribeTopicPartitionsResponse.json
> @@ -0,0 +1,66 @@
> +// Licensed to the Apache Software Foundation (ASF) under one or more
> +// contributor license agreements.  See the NOTICE file distributed with
> +// this work for additional information regarding copyright ownership.
> +// The ASF licenses this file to You under the Apache License, Version 2.0
> +// (the "License"); you may not use this file except in compliance with
> +// the License.  You may obtain a copy of the License at
> +//
> +//    http://www.apache.org/licenses/LICENSE-2.0
> +//
> +// Unless required by applicable law or agreed to in writing, software
> +// distributed under the License is distributed on an "AS IS" BASIS,
> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +// See the License for the specific language governing permissions and
> +// limitations under the License.
> +
> +{
> +  "apiKey": 75,
> +  "type": "response",
> +  "name": "DescribeTopicPartitionsResponse",
> +  "validVersions": "0",
> +  "flexibleVersions": "0+",
> +  "fields": [
> +    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
> "ignorable": true,
> +      "about": "The duration in milliseconds for which the request was 
> throttled due to a quota violation, or zero if the request did not violate 
> any quota." },
> +    { "name": "Topics", "type": "[]DescribeTopicPartitionsResponseTopic", 
> "versions": "0+",
> +      "about": "Each topic in the response.", "fields": [
> +      { "name": "ErrorCode", "type": "int16", "versions": "0+",
> +        "about": "The topic error, or 0 if there was no error." },
> +      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
> "entityType": "topicName", "nullableVersions": "0+",
> +        "about": "The topic name." },
> +      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": 
> true, "about": "The topic id." },
> +      { "name": "IsInternal", "type": "bool", "versions": "0+", "default": 
> "false", "ignorable": true,
> +        "about": "True if the topic is internal." },
> +      { "name": "Partitions", "type": 
> "[]DescribeTopicPartitionsResponsePartition", "versions": "0+",
> +        "about": "Each partition in the topic.", "fields": [
> +        { "name": "ErrorCode", "type": "int16", "versions": "0+",
> +          "about": "The partition error, or 0 if there was no error." },
> +        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
> +          "about": "The partition index." },
> +        { "name": "LeaderId", "type": "int32", "versions": "0+", 
> "entityType": "brokerId",
> +          "about": "The ID of the leader broker." },
> +        { "name": "LeaderEpoch", "type": "int32", "versions": "0+", 
> "default": "-1", "ignorable": true,
> +          "about": "The leader epoch of this partition." },
> +        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", 
> "entityType": "brokerId",
> +          "about": "The set of all nodes that host this partition." },
> +        { "name": "IsrNodes", "type": "[]int32", "versions": "0+", 
> "entityType": "brokerId",
> +          "about": "The set of nodes that are in sync with the leader for 
> this partition." },
> +        { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
> "null", "entityType": "brokerId",
> +          "versions": "0+", "nullableVersions": "0+",
> +          "about": "The new eligible leader replicas otherwise." },
> +        { "name": "LastKnownElr", "type": "[]int32", "default": "null", 
> "entityType": "brokerId",
> +          "versions": "0+", "nullableVersions": "0+",
> +          "about": "The last known ELR." },
> +        { "name": "OfflineReplicas", "type": "[]int32", "versions": "0+", 
> "ignorable": true, "entityType": "brokerId",
> +          "about": "The set of offline replicas of this partition." }]},
> +      { "name": "TopicAuthorizedOperations", "type": "int32", "versions": 
> "0+", "default": "-2147483648",
> +        "about": "32-bit bitfield to represent authorized operations for 
> this topic." }]
> +    },
> +    { "name": "NextCursor", "type": "Cursor", "versions": "0+", 
> "nullableVersions": "0+", "default": "null",
> +      "about": "The next topic and partition index to fetch details for.", 
> "fields": [
> +      { "name": "TopicName", "type": "string", "versions": "0+",
> +        "about": "The name for the first topic to process", "versions": 
> "0+", "entityType": "topicName"},
> +      { "name": "PartitionIndex", "type": "int32", "versions": "0+", 
> "about": "The partition index to start with"}
> +    ]}
> +  ]
> +}
> diff --git 
> a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
>  
> b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
> index 4a19a3a1683..b1fdf35d8e3 100644
> --- 
> a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
> +++ 
> b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
> @@ -130,6 +130,8 @@ import 
> org.apache.kafka.common.message.DescribeProducersRequestData;
>  import org.apache.kafka.common.message.DescribeProducersResponseData;
>  import org.apache.kafka.common.message.DescribeQuorumRequestData;
>  import org.apache.kafka.common.message.DescribeQuorumResponseData;
> +import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
> +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
>  import org.apache.kafka.common.message.DescribeTransactionsRequestData;
>  import org.apache.kafka.common.message.DescribeTransactionsResponseData;
>  import 
> org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
> @@ -1077,6 +1079,7 @@ public class RequestResponseTest {
>              case PUSH_TELEMETRY: return createPushTelemetryRequest(version);
>              case ASSIGN_REPLICAS_TO_DIRS: return 
> createAssignReplicasToDirsRequest(version);
>              case LIST_CLIENT_METRICS_RESOURCES: return 
> createListClientMetricsResourcesRequest(version);
> +            case DESCRIBE_TOPIC_PARTITIONS: return 
> createDescribeTopicPartitionsRequest(version);
>              default: throw new IllegalArgumentException("Unknown API key " + 
> apikey);
>          }
>      }
> @@ -1158,6 +1161,7 @@ public class RequestResponseTest {
>              case PUSH_TELEMETRY: return createPushTelemetryResponse();
>              case ASSIGN_REPLICAS_TO_DIRS: return 
> createAssignReplicasToDirsResponse();
>              case LIST_CLIENT_METRICS_RESOURCES: return 
> createListClientMetricsResourcesResponse();
> +            case DESCRIBE_TOPIC_PARTITIONS: return 
> createDescribeTopicPartitionsResponse();
>              default: throw new IllegalArgumentException("Unknown API key " + 
> apikey);
>          }
>      }
> @@ -1251,6 +1255,41 @@ public class RequestResponseTest {
>          return new AssignReplicasToDirsResponse(data);
>      }
>  
> +    private DescribeTopicPartitionsRequest 
> createDescribeTopicPartitionsRequest(short version) {
> +        DescribeTopicPartitionsRequestData data = new 
> DescribeTopicPartitionsRequestData()
> +                .setTopics(Arrays.asList(new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName("foo")))
> +                .setCursor(new 
> DescribeTopicPartitionsRequestData.Cursor().setTopicName("foo").setPartitionIndex(1));
> +        return new 
> DescribeTopicPartitionsRequest.Builder(data).build(version);
> +    }
> +
> +    private DescribeTopicPartitionsResponse 
> createDescribeTopicPartitionsResponse() {
> +        
> DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopicCollection
>  collection =
> +                new 
> DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopicCollection();
> +        collection.add(
> +                new 
> DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic()
> +                        
> .setTopicId(Uuid.fromString("sKhZV8LnTA275KvByB9bVg"))
> +                        .setErrorCode((short) 0)
> +                        .setIsInternal(false)
> +                        .setName("foo")
> +                        .setTopicAuthorizedOperations(0)
> +                        .setPartitions(Arrays.asList(
> +                                new 
> DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition()
> +                                        .setErrorCode((short) 0)
> +                                        .setIsrNodes(Arrays.asList(1))
> +                                        .setPartitionIndex(1)
> +                                        .setLeaderId(1)
> +                                        .setReplicaNodes(Arrays.asList(1))
> +                                        .setLeaderEpoch(0)
> +                        ))
> +        );
> +        DescribeTopicPartitionsResponseData data = new 
> DescribeTopicPartitionsResponseData()
> +                .setTopics(collection)
> +                .setNextCursor(
> +                        new 
> DescribeTopicPartitionsResponseData.Cursor().setTopicName("foo").setPartitionIndex(2)
> +                );
> +        return new DescribeTopicPartitionsResponse(data);
> +    }
> +
>      private ConsumerGroupHeartbeatRequest 
> createConsumerGroupHeartbeatRequest(short version) {
>          ConsumerGroupHeartbeatRequestData data = new 
> ConsumerGroupHeartbeatRequestData()
>              .setGroupId("group")
> diff --git 
> a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java
>  
> b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java
> new file mode 100644
> index 00000000000..e9ec3d8fae9
> --- /dev/null
> +++ 
> b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java
> @@ -0,0 +1,130 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package kafka.server.handlers;
> +
> +import kafka.network.RequestChannel;
> +import kafka.server.AuthHelper;
> +import kafka.server.KafkaConfig;
> +import kafka.server.metadata.KRaftMetadataCache;
> +import org.apache.kafka.common.Uuid;
> +import org.apache.kafka.common.errors.InvalidRequestException;
> +import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
> +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
> +import 
> org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
> +import 
> org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
> +import org.apache.kafka.common.protocol.Errors;
> +import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
> +import org.apache.kafka.common.resource.Resource;
> +import scala.collection.JavaConverters;
> +
> +import java.util.Collections;
> +import java.util.HashSet;
> +import java.util.List;
> +import java.util.Set;
> +import java.util.stream.Stream;
> +
> +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
> +import static org.apache.kafka.common.resource.ResourceType.TOPIC;
> +
> +public class DescribeTopicPartitionsRequestHandler {
> +    KRaftMetadataCache metadataCache;
> +    AuthHelper authHelper;
> +    KafkaConfig config;
> +
> +    public DescribeTopicPartitionsRequestHandler(
> +        KRaftMetadataCache metadataCache,
> +        AuthHelper authHelper,
> +        KafkaConfig config
> +    ) {
> +        this.metadataCache = metadataCache;
> +        this.authHelper = authHelper;
> +        this.config = config;
> +    }
> +
> +    public DescribeTopicPartitionsResponseData 
> handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
> +        DescribeTopicPartitionsRequestData request = 
> ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
> +        Set<String> topics = new HashSet<>();
> +        boolean fetchAllTopics = request.topics().isEmpty();
> +        DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
> +        String cursorTopicName = cursor != null ? cursor.topicName() : "";
> +        if (fetchAllTopics) {
> +            
> JavaConverters.asJavaCollection(metadataCache.getAllTopics()).forEach(topicName
>  -> {
> +                if (topicName.compareTo(cursorTopicName) >= 0) {
> +                    topics.add(topicName);
> +                }
> +            });
> +        } else {
> +            request.topics().forEach(topic -> {
> +                String topicName = topic.name();
> +                if (topicName.compareTo(cursorTopicName) >= 0) {
> +                    topics.add(topicName);
> +                }
> +            });
> +
> +            if (cursor != null && !topics.contains(cursor.topicName())) {
> +                // The topic in cursor must be included in the topic list if 
> provided.
> +                throw new 
> InvalidRequestException("DescribeTopicPartitionsRequest topic list should 
> contain the cursor topic: " + cursor.topicName());
> +            }
> +        }
> +
> +        // Do not disclose the existence of topics unauthorized for 
> Describe, so we've not even checked if they exist or not
> +        Set<DescribeTopicPartitionsResponseTopic> 
> unauthorizedForDescribeTopicMetadata = new HashSet<>();
> +
> +        Stream<String> authorizedTopicsStream = 
> topics.stream().sorted().filter(topicName -> {
> +            boolean isAuthorized = authHelper.authorize(
> +                abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, 
> true, 1);
> +            if (!fetchAllTopics && !isAuthorized) {
> +                // We should not return topicId when on unauthorized error, 
> so we return zero uuid.
> +                
> unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic(
> +                    Errors.TOPIC_AUTHORIZATION_FAILED, topicName, 
> Uuid.ZERO_UUID, false, Collections.emptyList())
> +                );
> +            }
> +            return isAuthorized;
> +        });
> +
> +        DescribeTopicPartitionsResponseData response = 
> metadataCache.getTopicMetadataForDescribeTopicResponse(
> +            
> JavaConverters.asScalaIterator(authorizedTopicsStream.iterator()),
> +            abstractRequest.context().listenerName,
> +            (String topicName) -> topicName.equals(cursorTopicName) ? 
> cursor.partitionIndex() : 0,
> +            Math.min(config.maxRequestPartitionSizeLimit(), 
> request.responsePartitionLimit()),
> +            fetchAllTopics
> +        );
> +
> +        // get topic authorized operations
> +        response.topics().forEach(topicData ->
> +            
> topicData.setTopicAuthorizedOperations(authHelper.authorizedOperations(abstractRequest,
>  new Resource(TOPIC, topicData.name()))));
> +
> +        response.topics().addAll(unauthorizedForDescribeTopicMetadata);
> +        return response;
> +    }
> +
> +    private DescribeTopicPartitionsResponseTopic 
> describeTopicPartitionsResponseTopic(
> +        Errors error,
> +        String topic,
> +        Uuid topicId,
> +        Boolean isInternal,
> +        List<DescribeTopicPartitionsResponsePartition> partitionData
> +    ) {
> +        return new DescribeTopicPartitionsResponseTopic()
> +            .setErrorCode(error.code())
> +            .setName(topic)
> +            .setTopicId(topicId)
> +            .setIsInternal(isInternal)
> +            .setPartitions(partitionData);
> +    }
> +}
> diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala 
> b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
> index e626fe3a4de..bf120376342 100644
> --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
> +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
> @@ -62,6 +62,7 @@ object RequestConvertToJson {
>        case req: DescribeLogDirsRequest => 
> DescribeLogDirsRequestDataJsonConverter.write(req.data, request.version)
>        case req: DescribeProducersRequest => 
> DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
>        case req: DescribeQuorumRequest => 
> DescribeQuorumRequestDataJsonConverter.write(req.data, request.version)
> +      case res: DescribeTopicPartitionsRequest => 
> DescribeTopicPartitionsRequestDataJsonConverter.write(res.data, 
> request.version)
>        case req: DescribeTransactionsRequest => 
> DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
>        case res: DescribeUserScramCredentialsRequest => 
> DescribeUserScramCredentialsRequestDataJsonConverter.write(res.data, 
> request.version)
>        case req: ElectLeadersRequest => 
> ElectLeadersRequestDataJsonConverter.write(req.data, request.version)
> @@ -144,6 +145,7 @@ object RequestConvertToJson {
>        case res: DescribeLogDirsResponse => 
> DescribeLogDirsResponseDataJsonConverter.write(res.data, version)
>        case res: DescribeProducersResponse => 
> DescribeProducersResponseDataJsonConverter.write(res.data, version)
>        case res: DescribeQuorumResponse => 
> DescribeQuorumResponseDataJsonConverter.write(res.data, version)
> +      case res: DescribeTopicPartitionsResponse => 
> DescribeTopicPartitionsResponseDataJsonConverter.write(res.data, version)
>        case res: DescribeTransactionsResponse => 
> DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
>        case res: DescribeUserScramCredentialsResponse => 
> DescribeUserScramCredentialsResponseDataJsonConverter.write(res.data, version)
>        case res: ElectLeadersResponse => 
> ElectLeadersResponseDataJsonConverter.write(res.data, version)
> @@ -182,8 +184,8 @@ object RequestConvertToJson {
>        case res: UnregisterBrokerResponse => 
> UnregisterBrokerResponseDataJsonConverter.write(res.data, version)
>        case res: UpdateFeaturesResponse => 
> UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
>        case res: UpdateMetadataResponse => 
> UpdateMetadataResponseDataJsonConverter.write(res.data, version)
> -      case res: WriteTxnMarkersResponse => 
> WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
>        case res: VoteResponse => 
> VoteResponseDataJsonConverter.write(res.data, version)
> +      case res: WriteTxnMarkersResponse => 
> WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
>        case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} 
> is not currently handled in `response`, the " +
>          "code should be updated to do so.");
>      }
> diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
> b/core/src/main/scala/kafka/server/KafkaApis.scala
> index 93f55f03998..ea66f32890f 100644
> --- a/core/src/main/scala/kafka/server/KafkaApis.scala
> +++ b/core/src/main/scala/kafka/server/KafkaApis.scala
> @@ -22,20 +22,20 @@ import kafka.controller.ReplicaAssignment
>  import kafka.coordinator.transaction.{InitProducerIdResult, 
> TransactionCoordinator}
>  import kafka.network.RequestChannel
>  import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
> -import kafka.server.metadata.ConfigRepository
> +import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
> +import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
>  import kafka.utils.Implicits._
>  import kafka.utils.{CoreUtils, Logging}
>  import org.apache.kafka.admin.AdminUtils
>  import org.apache.kafka.clients.admin.AlterConfigOp.OpType
>  import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, 
> EndpointType}
> -import org.apache.kafka.common.acl.AclOperation._
>  import org.apache.kafka.common.acl.AclOperation
> +import org.apache.kafka.common.acl.AclOperation._
>  import org.apache.kafka.common.config.ConfigResource
>  import org.apache.kafka.common.errors._
>  import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
> TRANSACTION_STATE_TOPIC_NAME, isInternal}
>  import org.apache.kafka.common.internals.{FatalExitError, Topic}
> -import 
> org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
> -import 
> org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResultCollection
> +import 
> org.apache.kafka.common.message.AddPartitionsToTxnResponseData.{AddPartitionsToTxnResult,
>  AddPartitionsToTxnResultCollection}
>  import 
> org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse
>  import 
> org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse,
>  ReassignableTopicResponse}
>  import 
> org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
> @@ -79,8 +79,8 @@ import java.lang.{Long => JLong}
>  import java.nio.ByteBuffer
>  import java.time.Duration
>  import java.util
> -import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
>  import java.util.concurrent.atomic.AtomicInteger
> +import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
>  import java.util.{Collections, Optional, OptionalInt}
>  import scala.annotation.nowarn
>  import scala.collection.mutable.ArrayBuffer
> @@ -120,6 +120,11 @@ class KafkaApis(val requestChannel: RequestChannel,
>    val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
>    val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker", 
> config)
>    val configManager = new ConfigAdminManager(brokerId, config, 
> configRepository)
> +  val describeTopicPartitionsRequestHandler : 
> Option[DescribeTopicPartitionsRequestHandler] = metadataCache match {
> +    case kRaftMetadataCache: KRaftMetadataCache =>
> +      Some(new DescribeTopicPartitionsRequestHandler(kRaftMetadataCache, 
> authHelper, config))
> +    case _ => None
> +  }
>  
>    def close(): Unit = {
>      aclApis.close()
> @@ -247,6 +252,7 @@ class KafkaApis(val requestChannel: RequestChannel,
>          case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
>          case ApiKeys.CONSUMER_GROUP_HEARTBEAT => 
> handleConsumerGroupHeartbeat(request).exceptionally(handleError)
>          case ApiKeys.CONSUMER_GROUP_DESCRIBE => 
> handleConsumerGroupDescribe(request).exceptionally(handleError)
> +        case ApiKeys.DESCRIBE_TOPIC_PARTITIONS => 
> handleDescribeTopicPartitionsRequest(request)
>          case ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS => 
> handleGetTelemetrySubscriptionsRequest(request)
>          case ApiKeys.PUSH_TELEMETRY => handlePushTelemetryRequest(request)
>          case ApiKeys.LIST_CLIENT_METRICS_RESOURCES => 
> handleListClientMetricsResources(request)
> @@ -1409,6 +1415,22 @@ class KafkaApis(val requestChannel: RequestChannel,
>        ))
>    }
>  
> +  def handleDescribeTopicPartitionsRequest(request: RequestChannel.Request): 
> Unit = {
> +    describeTopicPartitionsRequestHandler match {
> +      case Some(handler) => {
> +        val response = handler.handleDescribeTopicPartitionsRequest(request)
> +        trace("Sending topic partitions metadata %s for correlation id %d to 
> client %s".format(response.topics().asScala.mkString(","),
> +          request.header.correlationId, request.header.clientId))
> +
> +        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs 
> => {
> +          response.setThrottleTimeMs(requestThrottleMs)
> +          new DescribeTopicPartitionsResponse(response)
> +        })
> +      }
> +      case None => throw new InvalidRequestException("ZK cluster does not 
> handle DescribeTopicPartitions request")
> +    }
> +  }
> +
>    /**
>     * Handle an offset fetch request
>     */
> diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
> b/core/src/main/scala/kafka/server/KafkaConfig.scala
> index fec8519c928..c8910a5ccf8 100755
> --- a/core/src/main/scala/kafka/server/KafkaConfig.scala
> +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
> @@ -330,6 +330,9 @@ object KafkaConfig {
>    val MaxIncrementalFetchSessionCacheSlots = 
> "max.incremental.fetch.session.cache.slots"
>    val FetchMaxBytes = "fetch.max.bytes"
>  
> +  /** ********* Request Limit Configuration **************/
> +  val MaxRequestPartitionSizeLimit = "max.request.partition.size.limit"
> +
>    /** ********* Quota Configuration ***********/
>    val NumQuotaSamplesProp = "quota.window.num"
>    val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
> @@ -827,6 +830,9 @@ object KafkaConfig {
>    val MaxIncrementalFetchSessionCacheSlotsDoc = "The maximum number of 
> incremental fetch sessions that we will maintain."
>    val FetchMaxBytesDoc = "The maximum number of bytes we will return for a 
> fetch request. Must be at least 1024."
>  
> +  /** ********* Request Limit Configuration **************/
> +  val MaxRequestPartitionSizeLimitDoc = "The maximum number of partitions 
> can be served in one request."
> +
>    /** ********* Quota Configuration ***********/
>    val NumQuotaSamplesDoc = "The number of samples to retain in memory for 
> client quotas"
>    val NumReplicationQuotaSamplesDoc = "The number of samples to retain in 
> memory for replication quotas"
> @@ -1184,6 +1190,9 @@ object KafkaConfig {
>        .define(MaxIncrementalFetchSessionCacheSlots, INT, 
> Defaults.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS, atLeast(0), MEDIUM, 
> MaxIncrementalFetchSessionCacheSlotsDoc)
>        .define(FetchMaxBytes, INT, Defaults.FETCH_MAX_BYTES, atLeast(1024), 
> MEDIUM, FetchMaxBytesDoc)
>  
> +      /** ********* Request Limit Configuration ***********/
> +      .define(MaxRequestPartitionSizeLimit, INT, 
> Defaults.MAX_REQUEST_PARTITION_SIZE_LIMIT, atLeast(1), MEDIUM, 
> MaxRequestPartitionSizeLimitDoc)
> +
>        /** ********* Kafka Metrics Configuration ***********/
>        .define(MetricNumSamplesProp, INT, Defaults.METRIC_NUM_SAMPLES, 
> atLeast(1), LOW, MetricNumSamplesDoc)
>        .define(MetricSampleWindowMsProp, LONG, 
> Defaults.METRIC_SAMPLE_WINDOW_MS, atLeast(1), LOW, MetricSampleWindowMsDoc)
> @@ -1882,6 +1891,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
> java.util.Map[_, _], dynami
>    val maxIncrementalFetchSessionCacheSlots = 
> getInt(KafkaConfig.MaxIncrementalFetchSessionCacheSlots)
>    val fetchMaxBytes = getInt(KafkaConfig.FetchMaxBytes)
>  
> +  /** ********* Request Limit Configuration ***********/
> +  val maxRequestPartitionSizeLimit = 
> getInt(KafkaConfig.MaxRequestPartitionSizeLimit)
> +
>    val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
>    def compressionType = getString(KafkaConfig.CompressionTypeProp)
>  
> diff --git 
> a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
> b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
> index 485bba8812c..91fd95f60f3 100644
> --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
> +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
> @@ -21,27 +21,29 @@ import kafka.controller.StateChangeLogger
>  import kafka.server.{CachedControllerId, KRaftCachedControllerId, 
> MetadataCache}
>  import kafka.utils.Logging
>  import org.apache.kafka.admin.BrokerMetadata
> +import org.apache.kafka.common._
> +import org.apache.kafka.common.config.ConfigResource
> +import org.apache.kafka.common.errors.InvalidTopicException
>  import org.apache.kafka.common.internals.Topic
> +import 
> org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.{Cursor, 
> DescribeTopicPartitionsResponsePartition, 
> DescribeTopicPartitionsResponseTopic}
>  import 
> org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
>  MetadataResponseTopic}
> -import org.apache.kafka.common.{Cluster, Node, PartitionInfo, 
> TopicPartition, Uuid}
>  import 
> org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
> +import org.apache.kafka.common.message._
>  import org.apache.kafka.common.network.ListenerName
>  import org.apache.kafka.common.protocol.Errors
>  import org.apache.kafka.common.requests.MetadataResponse
>  import org.apache.kafka.image.MetadataImage
> -
> -import java.util
> -import java.util.{Collections, Properties}
> -import java.util.concurrent.ThreadLocalRandom
> -import org.apache.kafka.common.config.ConfigResource
> -import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, 
> DescribeClientQuotasResponseData}
> -import 
> org.apache.kafka.common.message.{DescribeUserScramCredentialsRequestData, 
> DescribeUserScramCredentialsResponseData}
>  import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, 
> Replicas}
>  import org.apache.kafka.server.common.{Features, MetadataVersion}
>  
> +import java.util
> +import java.util.concurrent.ThreadLocalRandom
> +import java.util.{Collections, Properties}
> +import scala.collection.mutable.ListBuffer
>  import scala.collection.{Map, Seq, Set, mutable}
> -import scala.jdk.CollectionConverters._
>  import scala.compat.java8.OptionConverters._
> +import scala.jdk.CollectionConverters._
> +import scala.util.control.Breaks._
>  
>  
>  class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with 
> Logging with ConfigRepository {
> @@ -140,6 +142,73 @@ class KRaftMetadataCache(val brokerId: Int) extends 
> MetadataCache with Logging w
>      }
>    }
>  
> +  /**
> +   * Return topic partition metadata for the given topic, listener and index 
> range. Also, return the next partition
> +   * index that is not included in the result.
> +   *
> +   * @param image                       The metadata image
> +   * @param topicName                   The name of the topic.
> +   * @param listenerName                The listener name.
> +   * @param startIndex                  The smallest index of the partitions 
> to be included in the result.
> +   * @param upperIndex                  The upper limit of the index of the 
> partitions to be included in the result.
> +   *                                    Note that, the upper index can be 
> larger than the largest partition index in
> +   *                                    this topic.
> +   * @return                            A collection of topic partition 
> metadata and next partition index (-1 means
> +   *                                    no next partition).
> +   */
> +  private def getPartitionMetadataForDescribeTopicResponse(
> +    image: MetadataImage,
> +    topicName: String,
> +    listenerName: ListenerName,
> +    startIndex: Int,
> +    maxCount: Int
> +  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Int) = {
> +    Option(image.topics().getTopic(topicName)) match {
> +      case None => (None, -1)
> +      case Some(topic) => {
> +        val result = new 
> ListBuffer[DescribeTopicPartitionsResponsePartition]()
> +        val partitions = topic.partitions().keySet()
> +        val upperIndex = topic.partitions().size().min(startIndex + maxCount)
> +        val nextIndex = if (upperIndex < partitions.size()) upperIndex else 
> -1
> +        for (partitionId <- startIndex until upperIndex) {
> +          topic.partitions().get(partitionId) match {
> +            case partition : PartitionRegistration => {
> +              val filteredReplicas = maybeFilterAliveReplicas(image, 
> partition.replicas,
> +                listenerName, false)
> +              val filteredIsr = maybeFilterAliveReplicas(image, 
> partition.isr, listenerName, false)
> +              val offlineReplicas = getOfflineReplicas(image, partition, 
> listenerName)
> +              val maybeLeader = getAliveEndpoint(image, partition.leader, 
> listenerName)
> +              maybeLeader match {
> +                case None =>
> +                  result.append(new 
> DescribeTopicPartitionsResponsePartition()
> +                    .setPartitionIndex(partitionId)
> +                    .setLeaderId(MetadataResponse.NO_LEADER_ID)
> +                    .setLeaderEpoch(partition.leaderEpoch)
> +                    .setReplicaNodes(filteredReplicas)
> +                    .setIsrNodes(filteredIsr)
> +                    .setOfflineReplicas(offlineReplicas)
> +                    
> .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
> +                    
> .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
> +                case Some(leader) =>
> +                  result.append(new 
> DescribeTopicPartitionsResponsePartition()
> +                    .setPartitionIndex(partitionId)
> +                    .setLeaderId(leader.id())
> +                    .setLeaderEpoch(partition.leaderEpoch)
> +                    .setReplicaNodes(filteredReplicas)
> +                    .setIsrNodes(filteredIsr)
> +                    .setOfflineReplicas(offlineReplicas)
> +                    
> .setEligibleLeaderReplicas(Replicas.toList(partition.elr))
> +                    
> .setLastKnownElr(Replicas.toList(partition.lastKnownElr)))
> +              }
> +            }
> +            case _ => warn(s"The partition $partitionId does not exist for 
> $topicName")
> +          }
> +        }
> +        (Some(result.toList), nextIndex)
> +      }
> +    }
> +  }
> +
>    private def getOfflineReplicas(image: MetadataImage,
>                                   partition: PartitionRegistration,
>                                   listenerName: ListenerName): 
> util.List[Integer] = {
> @@ -189,6 +258,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
> MetadataCache with Logging w
>      }
>    }
>  
> +  /**
> +   * Get the topic metadata for the given topics.
> +   *
> +   * The quota is used to limit the number of partitions to return. The 
> NextTopicPartition field points to the first
> +   * partition can't be returned due the limit.
> +   * If a topic can't return any partition due to quota limit reached, this 
> topic will not be included in the response.
> +   *
> +   * Note, the topics should be sorted in alphabetical order. The topics in 
> the DescribeTopicPartitionsResponseData
> +   * will also be sorted in alphabetical order.
> +   *
> +   * @param topics                        The iterator of topics and their 
> corresponding first partition id to fetch.
> +   * @param listenerName                  The listener name.
> +   * @param firstTopicPartitionStartIndex The start partition index for the 
> first topic
> +   * @param maximumNumberOfPartitions     The max number of partitions to 
> return.
> +   * @param ignoreTopicsWithExceptions    Whether ignore the topics with 
> exception.
> +   */
> +  def getTopicMetadataForDescribeTopicResponse(
> +    topics: Iterator[String],
> +    listenerName: ListenerName,
> +    topicPartitionStartIndex: String => Int,
> +    maximumNumberOfPartitions: Int,
> +    ignoreTopicsWithExceptions: Boolean
> +  ): DescribeTopicPartitionsResponseData = {
> +    val image = _currentImage
> +    var remaining = maximumNumberOfPartitions
> +    val result = new DescribeTopicPartitionsResponseData()
> +    breakable {
> +      topics.foreach { topicName =>
> +        if (remaining > 0) {
> +          val (partitionResponse, nextPartition) =
> +            getPartitionMetadataForDescribeTopicResponse(
> +              image, topicName, listenerName, 
> topicPartitionStartIndex(topicName), remaining
> +            )
> +          partitionResponse.map(partitions => {
> +            val response = new DescribeTopicPartitionsResponseTopic()
> +              .setErrorCode(Errors.NONE.code)
> +              .setName(topicName)
> +              
> .setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
> +              .setIsInternal(Topic.isInternal(topicName))
> +              .setPartitions(partitions.asJava)
> +            result.topics().add(response)
> +
> +            if (nextPartition != -1) {
> +              result.setNextCursor(new Cursor()
> +                .setTopicName(topicName)
> +                .setPartitionIndex(nextPartition)
> +              )
> +              break()
> +            }
> +            remaining -= partitions.size
> +          })
> +
> +          if (!ignoreTopicsWithExceptions && !partitionResponse.isDefined) {
> +            val error = try {
> +              Topic.validate(topicName)
> +              Errors.UNKNOWN_TOPIC_OR_PARTITION
> +            } catch {
> +              case _: InvalidTopicException =>
> +                Errors.INVALID_TOPIC_EXCEPTION
> +            }
> +            result.topics().add(new DescribeTopicPartitionsResponseTopic()
> +              .setErrorCode(error.code())
> +              .setName(topicName)
> +              .setTopicId(getTopicId(topicName))
> +              .setIsInternal(Topic.isInternal(topicName)))
> +          }
> +        } else if (remaining == 0) {
> +          // The cursor should point to the beginning of the current topic. 
> All the partitions in the previous topic
> +          // should be fulfilled. Note that, if a partition is pointed in 
> the NextTopicPartition, it does not mean
> +          // this topic exists.
> +          result.setNextCursor(new Cursor()
> +            .setTopicName(topicName)
> +            .setPartitionIndex(0))
> +          break()
> +        }
> +      }
> +    }
> +    result
> +  }
> +
>    override def getAllTopics(): Set[String] = 
> _currentImage.topics().topicsByName().keySet().asScala
>  
>    override def getTopicPartitions(topicName: String): Set[TopicPartition] = {
> diff --git 
> a/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
>  
> b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
> new file mode 100644
> index 00000000000..2596054ffee
> --- /dev/null
> +++ 
> b/core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java
> @@ -0,0 +1,545 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *    http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package kafka.server.handlers;
> +
> +import kafka.network.RequestChannel;
> +import kafka.server.AuthHelper;
> +import kafka.server.KafkaConfig;
> +import kafka.server.metadata.KRaftMetadataCache;
> +import kafka.utils.TestUtils;
> +import org.apache.kafka.common.Uuid;
> +import org.apache.kafka.common.acl.AclOperation;
> +import org.apache.kafka.common.errors.InvalidRequestException;
> +import org.apache.kafka.common.errors.SerializationException;
> +import org.apache.kafka.common.memory.MemoryPool;
> +import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
> +import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
> +import 
> org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
> +import org.apache.kafka.common.message.UpdateMetadataRequestData;
> +import 
> org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
> +import org.apache.kafka.common.metadata.PartitionRecord;
> +import org.apache.kafka.common.metadata.RegisterBrokerRecord;
> +import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
> +import 
> org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
> +import org.apache.kafka.common.metadata.TopicRecord;
> +import org.apache.kafka.common.network.ClientInformation;
> +import org.apache.kafka.common.network.ListenerName;
> +import org.apache.kafka.common.protocol.ApiMessage;
> +import org.apache.kafka.common.protocol.Errors;
> +import org.apache.kafka.common.requests.AbstractRequest;
> +import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
> +import org.apache.kafka.common.requests.RequestContext;
> +import org.apache.kafka.common.requests.RequestHeader;
> +import org.apache.kafka.common.resource.PatternType;
> +import org.apache.kafka.common.resource.ResourcePattern;
> +import org.apache.kafka.common.resource.ResourceType;
> +import org.apache.kafka.common.security.auth.KafkaPrincipal;
> +import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
> +import org.apache.kafka.common.security.auth.SecurityProtocol;
> +import org.apache.kafka.common.utils.SecurityUtils;
> +import org.apache.kafka.common.utils.Utils;
> +import org.apache.kafka.image.ClusterImage;
> +import org.apache.kafka.image.MetadataDelta;
> +import org.apache.kafka.image.MetadataImage;
> +import org.apache.kafka.image.MetadataProvenance;
> +import org.apache.kafka.metadata.LeaderRecoveryState;
> +import org.apache.kafka.server.authorizer.Action;
> +import org.apache.kafka.server.authorizer.AuthorizationResult;
> +import org.apache.kafka.server.authorizer.Authorizer;
> +import org.apache.kafka.server.common.MetadataVersion;
> +import org.junit.jupiter.api.Test;
> +
> +import java.net.InetAddress;
> +import java.net.UnknownHostException;
> +import java.nio.ByteBuffer;
> +import java.util.Arrays;
> +import java.util.HashMap;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Optional;
> +import java.util.Properties;
> +import java.util.stream.Collectors;
> +
> +import static org.junit.jupiter.api.Assertions.assertEquals;
> +import static org.junit.jupiter.api.Assertions.assertNotEquals;
> +import static org.junit.jupiter.api.Assertions.assertTrue;
> +import static org.mockito.ArgumentMatchers.any;
> +import static org.mockito.ArgumentMatchers.argThat;
> +import static org.mockito.Mockito.mock;
> +import static org.mockito.Mockito.when;
> +
> +class DescribeTopicPartitionsRequestHandlerTest {
> +    private int brokerId = 1;
> +    private RequestChannel.Metrics requestChannelMetrics = 
> mock(RequestChannel.Metrics.class);
> +    private KafkaPrincipalSerde kafkaPrincipalSerde = new 
> KafkaPrincipalSerde() {
> +        @Override
> +        public byte[] serialize(KafkaPrincipal principal) throws 
> SerializationException {
> +            return Utils.utf8(principal.toString());
> +        }
> +
> +        @Override
> +        public KafkaPrincipal deserialize(byte[] bytes) throws 
> SerializationException {
> +            return SecurityUtils.parseKafkaPrincipal(Utils.utf8(bytes));
> +        }
> +    };
> +
> +    ListenerName plaintextListener = 
> ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
> +    UpdateMetadataBroker broker = new UpdateMetadataBroker()
> +        .setId(0)
> +        .setRack("rack")
> +        .setEndpoints(Arrays.asList(
> +            new UpdateMetadataRequestData.UpdateMetadataEndpoint()
> +                .setHost("broker0")
> +                .setPort(9092)
> +                .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
> +                .setListener(plaintextListener.value())
> +        ));
> +
> +    @Test
> +    void testDescribeTopicPartitionsRequest() {
> +        // 1. Set up authorizer
> +        Authorizer authorizer = mock(Authorizer.class);
> +        String unauthorizedTopic = "unauthorized-topic";
> +        String authorizedTopic = "authorized-topic";
> +        String authorizedNonExistTopic = "authorized-non-exist";
> +
> +        Action expectedActions1 = new Action(AclOperation.DESCRIBE, new 
> ResourcePattern(ResourceType.TOPIC, unauthorizedTopic, PatternType.LITERAL), 
> 1, true, true);
> +        Action expectedActions2 = new Action(AclOperation.DESCRIBE, new 
> ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, 
> true, true);
> +        Action expectedActions3 = new Action(AclOperation.DESCRIBE, new 
> ResourcePattern(ResourceType.TOPIC, authorizedNonExistTopic, 
> PatternType.LITERAL), 1, true, true);
> +
> +        // Here we need to use AuthHelperTest.matchSameElements instead of 
> EasyMock.eq since the order of the request is unknown
> +        when(authorizer.authorize(any(RequestContext.class), argThat(t ->
> +            t.contains(expectedActions1) || t.contains(expectedActions2) || 
> t.contains(expectedActions3))))
> +            .thenAnswer(invocation -> {
> +                List<Action> actions = (List<Action>) 
> invocation.getArgument(1);
> +                return actions.stream().map(action -> {
> +                    if 
> (action.resourcePattern().name().startsWith("authorized"))
> +                        return AuthorizationResult.ALLOWED;
> +                    else
> +                        return AuthorizationResult.DENIED;
> +                }).collect(Collectors.toList());
> +            });
> +
> +        // 2. Set up MetadataCache
> +        Uuid authorizedTopicId = Uuid.randomUuid();
> +        Uuid unauthorizedTopicId = Uuid.randomUuid();
> +
> +        Map<String, Uuid> topicIds = new HashMap<>();
> +        topicIds.put(authorizedTopic, authorizedTopicId);
> +        topicIds.put(unauthorizedTopic, unauthorizedTopicId);
> +
> +        BrokerEndpointCollection collection = new BrokerEndpointCollection();
> +        collection.add(new BrokerEndpoint()
> +            .setName(broker.endpoints().get(0).listener())
> +            .setHost(broker.endpoints().get(0).host())
> +            .setPort(broker.endpoints().get(0).port())
> +            
> .setSecurityProtocol(broker.endpoints().get(0).securityProtocol())
> +        );
> +        List<ApiMessage> records = Arrays.asList(
> +            new RegisterBrokerRecord()
> +                .setBrokerId(broker.id())
> +                .setBrokerEpoch(0)
> +                .setIncarnationId(Uuid.randomUuid())
> +                .setEndPoints(collection)
> +                .setRack(broker.rack())
> +                .setFenced(false),
> +            new 
> TopicRecord().setName(authorizedTopic).setTopicId(topicIds.get(authorizedTopic)),
> +            new 
> TopicRecord().setName(unauthorizedTopic).setTopicId(topicIds.get(unauthorizedTopic)),
> +            new PartitionRecord()
> +                .setTopicId(authorizedTopicId)
> +                .setPartitionId(1)
> +                .setReplicas(Arrays.asList(0, 1, 2))
> +                .setLeader(0)
> +                .setIsr(Arrays.asList(0))
> +                .setEligibleLeaderReplicas(Arrays.asList(1))
> +                .setLastKnownElr(Arrays.asList(2))
> +                .setLeaderEpoch(0)
> +                .setPartitionEpoch(1)
> +                
> .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
> +            new PartitionRecord()
> +                .setTopicId(authorizedTopicId)
> +                .setPartitionId(0)
> +                .setReplicas(Arrays.asList(0, 1, 2))
> +                .setLeader(0)
> +                .setIsr(Arrays.asList(0))
> +                .setEligibleLeaderReplicas(Arrays.asList(1))
> +                .setLastKnownElr(Arrays.asList(2))
> +                .setLeaderEpoch(0)
> +                .setPartitionEpoch(1)
> +                
> .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
> +            new PartitionRecord()
> +                .setTopicId(unauthorizedTopicId)
> +                .setPartitionId(0)
> +                .setReplicas(Arrays.asList(0, 1, 3))
> +                .setLeader(0)
> +                .setIsr(Arrays.asList(0))
> +                .setEligibleLeaderReplicas(Arrays.asList(1))
> +                .setLastKnownElr(Arrays.asList(3))
> +                .setLeaderEpoch(0)
> +                .setPartitionEpoch(2)
> +                
> .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
> +        );
> +        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
> +        updateKraftMetadataCache(metadataCache, records);
> +        DescribeTopicPartitionsRequestHandler handler =
> +            new DescribeTopicPartitionsRequestHandler(metadataCache, new 
> AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
> +
> +        // 3.1 Basic test
> +        DescribeTopicPartitionsRequest describeTopicPartitionsRequest = new 
> DescribeTopicPartitionsRequest(
> +            new DescribeTopicPartitionsRequestData()
> +                .setTopics(Arrays.asList(
> +                    new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
> +                    new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(unauthorizedTopic)
> +                ))
> +        );
> +        RequestChannel.Request request;
> +        try {
> +            request = buildRequest(describeTopicPartitionsRequest, 
> plaintextListener);
> +        } catch (Exception e) {
> +            assertTrue(false, e.getMessage());
> +            return;
> +        }
> +        DescribeTopicPartitionsResponseData response = 
> handler.handleDescribeTopicPartitionsRequest(request);
> +        List<DescribeTopicPartitionsResponseTopic> topics = 
> response.topics().valuesList();
> +        assertEquals(2, topics.size());
> +        DescribeTopicPartitionsResponseTopic topicToCheck = topics.get(0);
> +        assertEquals(authorizedTopicId, topicToCheck.topicId());
> +        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
> +        assertEquals(authorizedTopic, topicToCheck.name());
> +        assertEquals(2, topicToCheck.partitions().size());
> +
> +        topicToCheck = topics.get(1);
> +        assertNotEquals(unauthorizedTopicId, topicToCheck.topicId());
> +        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), 
> topicToCheck.errorCode());
> +        assertEquals(unauthorizedTopic, topicToCheck.name());
> +
> +        // 3.2 With cursor
> +        describeTopicPartitionsRequest = new 
> DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
> +            .setTopics(Arrays.asList(
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(unauthorizedTopic)
> +                ))
> +            .setCursor(new 
> DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1))
> +        );
> +
> +        try {
> +            request = buildRequest(describeTopicPartitionsRequest, 
> plaintextListener);
> +        } catch (Exception e) {
> +            assertTrue(false, e.getMessage());
> +            return;
> +        }
> +        response = handler.handleDescribeTopicPartitionsRequest(request);
> +        topics = response.topics().valuesList();
> +        assertEquals(2, topics.size());
> +        topicToCheck = topics.get(0);
> +        assertEquals(authorizedTopicId, topicToCheck.topicId());
> +        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
> +        assertEquals(authorizedTopic, topicToCheck.name());
> +        assertEquals(1, topicToCheck.partitions().size());
> +
> +        topicToCheck = topics.get(1);
> +        assertNotEquals(unauthorizedTopicId, topicToCheck.topicId());
> +        assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), 
> topicToCheck.errorCode());
> +        assertEquals(unauthorizedTopic, topicToCheck.name());
> +
> +        // 3.3 Fetch all topics
> +        describeTopicPartitionsRequest = new 
> DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData());
> +        try {
> +            request = buildRequest(describeTopicPartitionsRequest, 
> plaintextListener);
> +        } catch (Exception e) {
> +            assertTrue(false, e.getMessage());
> +            return;
> +        }
> +        response = handler.handleDescribeTopicPartitionsRequest(request);
> +        topics = response.topics().valuesList();
> +        assertEquals(1, topics.size());
> +        topicToCheck = topics.get(0);
> +        assertEquals(authorizedTopicId, topicToCheck.topicId());
> +        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
> +        assertEquals(authorizedTopic, topicToCheck.name());
> +        assertEquals(2, topicToCheck.partitions().size());
> +
> +        // 3.4 Fetch all topics with cursor
> +        describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
> +            new DescribeTopicPartitionsRequestData().setCursor(
> +                new 
> DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1)));
> +        try {
> +            request = buildRequest(describeTopicPartitionsRequest, 
> plaintextListener);
> +        } catch (Exception e) {
> +            assertTrue(false, e.getMessage());
> +            return;
> +        }
> +        response = handler.handleDescribeTopicPartitionsRequest(request);
> +        topics = response.topics().valuesList();
> +        assertEquals(1, topics.size());
> +        topicToCheck = topics.get(0);
> +        assertEquals(authorizedTopicId, topicToCheck.topicId());
> +        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
> +        assertEquals(authorizedTopic, topicToCheck.name());
> +        assertEquals(1, topicToCheck.partitions().size());
> +
> +        // 3.5 Fetch all topics with limit
> +        describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(
> +                new 
> DescribeTopicPartitionsRequestData().setResponsePartitionLimit(1)
> +        );
> +        try {
> +            request = buildRequest(describeTopicPartitionsRequest, 
> plaintextListener);
> +        } catch (Exception e) {
> +            assertTrue(false, e.getMessage());
> +            return;
> +        }
> +        response = handler.handleDescribeTopicPartitionsRequest(request);
> +        topics = response.topics().valuesList();
> +        assertEquals(1, topics.size());
> +        topicToCheck = topics.get(0);
> +        assertEquals(authorizedTopicId, topicToCheck.topicId());
> +        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
> +        assertEquals(authorizedTopic, topicToCheck.name());
> +        assertEquals(1, topicToCheck.partitions().size());
> +        assertEquals(authorizedTopic, response.nextCursor().topicName());
> +        assertEquals(1, response.nextCursor().partitionIndex());
> +    }
> +
> +    @Test
> +    void testDescribeTopicPartitionsRequestWithEdgeCases() {
> +        // 1. Set up authorizer
> +        Authorizer authorizer = mock(Authorizer.class);
> +        String authorizedTopic = "authorized-topic1";
> +        String authorizedTopic2 = "authorized-topic2";
> +
> +        Action expectedActions1 = new Action(AclOperation.DESCRIBE, new 
> ResourcePattern(ResourceType.TOPIC, authorizedTopic, PatternType.LITERAL), 1, 
> true, true);
> +        Action expectedActions2 = new Action(AclOperation.DESCRIBE, new 
> ResourcePattern(ResourceType.TOPIC, authorizedTopic2, PatternType.LITERAL), 
> 1, true, true);
> +
> +        // Here we need to use AuthHelperTest.matchSameElements instead of 
> EasyMock.eq since the order of the request is unknown
> +        when(authorizer.authorize(any(RequestContext.class), argThat(t ->
> +            t.contains(expectedActions1) || t.contains(expectedActions2))))
> +            .thenAnswer(invocation -> {
> +                List<Action> actions = (List<Action>) 
> invocation.getArgument(1);
> +                return actions.stream().map(action -> {
> +                    if 
> (action.resourcePattern().name().startsWith("authorized"))
> +                        return AuthorizationResult.ALLOWED;
> +                    else
> +                        return AuthorizationResult.DENIED;
> +                }).collect(Collectors.toList());
> +            });
> +
> +        // 2. Set up MetadataCache
> +        Uuid authorizedTopicId = Uuid.randomUuid();
> +        Uuid authorizedTopicId2 = Uuid.randomUuid();
> +
> +        Map<String, Uuid> topicIds = new HashMap<>();
> +        topicIds.put(authorizedTopic, authorizedTopicId);
> +        topicIds.put(authorizedTopic2, authorizedTopicId2);
> +
> +        BrokerEndpointCollection collection = new BrokerEndpointCollection();
> +        collection.add(new BrokerEndpoint()
> +                .setName(broker.endpoints().get(0).listener())
> +                .setHost(broker.endpoints().get(0).host())
> +                .setPort(broker.endpoints().get(0).port())
> +                
> .setSecurityProtocol(broker.endpoints().get(0).securityProtocol())
> +        );
> +        List<ApiMessage> records = Arrays.asList(
> +            new RegisterBrokerRecord()
> +                .setBrokerId(broker.id())
> +                .setBrokerEpoch(0)
> +                .setIncarnationId(Uuid.randomUuid())
> +                .setEndPoints(collection)
> +                .setRack(broker.rack())
> +                .setFenced(false),
> +            new 
> TopicRecord().setName(authorizedTopic).setTopicId(topicIds.get(authorizedTopic)),
> +            new 
> TopicRecord().setName(authorizedTopic2).setTopicId(topicIds.get(authorizedTopic2)),
> +            new PartitionRecord()
> +                .setTopicId(authorizedTopicId)
> +                .setPartitionId(0)
> +                .setReplicas(Arrays.asList(0, 1, 2))
> +                .setLeader(0)
> +                .setIsr(Arrays.asList(0))
> +                .setEligibleLeaderReplicas(Arrays.asList(1))
> +                .setLastKnownElr(Arrays.asList(2))
> +                .setLeaderEpoch(0)
> +                .setPartitionEpoch(1)
> +                
> .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
> +            new PartitionRecord()
> +                .setTopicId(authorizedTopicId)
> +                .setPartitionId(1)
> +                .setReplicas(Arrays.asList(0, 1, 2))
> +                .setLeader(0)
> +                .setIsr(Arrays.asList(0))
> +                .setEligibleLeaderReplicas(Arrays.asList(1))
> +                .setLastKnownElr(Arrays.asList(2))
> +                .setLeaderEpoch(0)
> +                .setPartitionEpoch(1)
> +                
> .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
> +            new PartitionRecord()
> +                .setTopicId(authorizedTopicId2)
> +                .setPartitionId(0)
> +                .setReplicas(Arrays.asList(0, 1, 3))
> +                .setLeader(0)
> +                .setIsr(Arrays.asList(0))
> +                .setEligibleLeaderReplicas(Arrays.asList(1))
> +                .setLastKnownElr(Arrays.asList(3))
> +                .setLeaderEpoch(0)
> +                .setPartitionEpoch(2)
> +                
> .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value())
> +        );
> +        KRaftMetadataCache metadataCache = new KRaftMetadataCache(0);
> +        updateKraftMetadataCache(metadataCache, records);
> +        DescribeTopicPartitionsRequestHandler handler =
> +            new DescribeTopicPartitionsRequestHandler(metadataCache, new 
> AuthHelper(scala.Option.apply(authorizer)), createKafkaDefaultConfig());
> +
> +        // 3.1 With cursor point to the first one
> +        DescribeTopicPartitionsRequest describeTopicPartitionsRequest = new 
> DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
> +            .setTopics(Arrays.asList(
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
> +                ))
> +            .setCursor(new 
> DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(1))
> +        );
> +
> +        RequestChannel.Request request;
> +        try {
> +            request = buildRequest(describeTopicPartitionsRequest, 
> plaintextListener);
> +        } catch (Exception e) {
> +            assertTrue(false, e.getMessage());
> +            return;
> +        }
> +        DescribeTopicPartitionsResponseData response = 
> handler.handleDescribeTopicPartitionsRequest(request);
> +        List<DescribeTopicPartitionsResponseTopic> topics = 
> response.topics().valuesList();
> +        assertEquals(2, topics.size());
> +        DescribeTopicPartitionsResponseTopic topicToCheck = topics.get(0);
> +        assertEquals(authorizedTopicId, topicToCheck.topicId());
> +        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
> +        assertEquals(authorizedTopic, topicToCheck.name());
> +        assertEquals(1, topicToCheck.partitions().size());
> +
> +        topicToCheck = topics.get(1);
> +        assertEquals(authorizedTopicId2, topicToCheck.topicId());
> +        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
> +        assertEquals(authorizedTopic2, topicToCheck.name());
> +        assertEquals(1, topicToCheck.partitions().size());
> +
> +        // 3.2 With cursor point to the second one. The first topic should 
> be ignored.
> +        describeTopicPartitionsRequest = new 
> DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
> +            .setTopics(Arrays.asList(
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
> +                ))
> +            .setCursor(new 
> DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic2).setPartitionIndex(0))
> +        );
> +
> +        try {
> +            request = buildRequest(describeTopicPartitionsRequest, 
> plaintextListener);
> +        } catch (Exception e) {
> +            assertTrue(false, e.getMessage());
> +            return;
> +        }
> +        response = handler.handleDescribeTopicPartitionsRequest(request);
> +        topics = response.topics().valuesList();
> +        assertEquals(1, topics.size());
> +        topicToCheck = topics.get(0);
> +        assertEquals(authorizedTopicId2, topicToCheck.topicId());
> +        assertEquals(Errors.NONE.code(), topicToCheck.errorCode());
> +        assertEquals(authorizedTopic2, topicToCheck.name());
> +        assertEquals(1, topicToCheck.partitions().size());
> +
> +        // 3.3 With cursor point to a non existing topic. Exception should 
> be thrown if not querying all the topics.
> +        describeTopicPartitionsRequest = new 
> DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
> +            .setTopics(Arrays.asList(
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
> +                new 
> DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
> +                ))
> +            .setCursor(new 
> DescribeTopicPartitionsRequestData.Cursor().setTopicName("Non-existing").setPartitionIndex(0))
> +        );
> +
> +        try {
> +            
> handler.handleDescribeTopicPartitionsRequest(buildRequest(describeTopicPartitionsRequest,
>  plaintextListener));
> +        } catch (Exception e) {
> +            assertTrue(e instanceof InvalidRequestException, e.getMessage());
> +        }
> +    }
> +
> +    void updateKraftMetadataCache(KRaftMetadataCache kRaftMetadataCache, 
> List<ApiMessage> records) {
> +        MetadataImage image = kRaftMetadataCache.currentImage();
> +        MetadataImage partialImage = new MetadataImage(
> +            new MetadataProvenance(100L, 10, 1000L),
> +            image.features(),
> +            ClusterImage.EMPTY,
> +            image.topics(),
> +            image.configs(),
> +            image.clientQuotas(),
> +            image.producerIds(),
> +            image.acls(),
> +            image.scram(),
> +            image.delegationTokens()
> +        );
> +        MetadataDelta delta = new 
> MetadataDelta.Builder().setImage(partialImage).build();
> +        records.stream().forEach(record -> delta.replay(record));
> +        kRaftMetadataCache.setImage(delta.apply(new MetadataProvenance(100L, 
> 10, 1000L)));
> +    }
> +
> +    private RequestChannel.Request buildRequest(AbstractRequest request,
> +                                                ListenerName listenerName
> +    ) throws UnknownHostException {
> +        ByteBuffer buffer = request.serializeWithHeader(
> +            new RequestHeader(request.apiKey(), request.version(), 
> "test-client", 0));
> +
> +        // read the header from the buffer first so that the body can be 
> read next from the Request constructor
> +        RequestHeader header = RequestHeader.parse(buffer);
> +        // DelegationTokens require the context authenticated to be non 
> SecurityProtocol.PLAINTEXT
> +        // and have a non KafkaPrincipal.ANONYMOUS principal. This test is 
> done before the check
> +        // for forwarding because after forwarding the context will have a 
> different context.
> +        // We validate the context authenticated failure case in other 
> integration tests.
> +        RequestContext context = new RequestContext(header, "1", 
> InetAddress.getLocalHost(), Optional.empty(), new 
> KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Alice"),
> +                listenerName, SecurityProtocol.SSL, ClientInformation.EMPTY, 
> false,
> +                Optional.of(kafkaPrincipalSerde));
> +        return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, 
> buffer,
> +                requestChannelMetrics, scala.Option.apply(null));
> +    }
> +
> +    KafkaConfig createKafkaDefaultConfig() {
> +        Properties properties = TestUtils.createBrokerConfig(
> +            brokerId,
> +            "",
> +            true,
> +            true,
> +            TestUtils.RandomPort(),
> +            scala.Option.apply(null),
> +            scala.Option.apply(null),
> +            scala.Option.apply(null),
> +            true,
> +            false,
> +            TestUtils.RandomPort(),
> +            false,
> +            TestUtils.RandomPort(),
> +            false,
> +            TestUtils.RandomPort(),
> +            scala.Option.apply(null),
> +            1,
> +            false,
> +            1,
> +            (short) 1,
> +            false);
> +        properties.put(KafkaConfig.NodeIdProp(), Integer.toString(brokerId));
> +        properties.put(KafkaConfig.ProcessRolesProp(), "broker");
> +        int voterId = brokerId + 1;
> +        properties.put(KafkaConfig.QuorumVotersProp(), voterId + 
> "@localhost:9093");
> +        properties.put(KafkaConfig.ControllerListenerNamesProp(), "SSL");
> +        TestUtils.setIbpAndMessageFormatVersions(properties, 
> MetadataVersion.latestProduction());
> +        return new KafkaConfig(properties);
> +    }
> +}
> \ No newline at end of file
> diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
> b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
> index e769c7ddaed..3d0f3ac0cb8 100644
> --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
> @@ -17,12 +17,6 @@
>  
>  package kafka.server
>  
> -import java.net.InetAddress
> -import java.nio.charset.StandardCharsets
> -import java.util
> -import java.util.Arrays.asList
> -import java.util.concurrent.{CompletableFuture, TimeUnit}
> -import java.util.{Collections, Comparator, Optional, OptionalInt, 
> OptionalLong, Properties}
>  import kafka.api.LeaderAndIsr
>  import kafka.cluster.{Broker, Partition}
>  import kafka.controller.{ControllerContext, KafkaController}
> @@ -37,30 +31,28 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType
>  import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
>  import org.apache.kafka.common.acl.AclOperation
>  import org.apache.kafka.common.config.ConfigResource
> +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, 
> BROKER_LOGGER}
>  import org.apache.kafka.common.errors.UnsupportedVersionException
>  import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
>  import org.apache.kafka.common.memory.MemoryPool
> -import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, 
> BROKER_LOGGER}
>  import 
> org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
>  AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, 
> AddPartitionsToTxnTransactionCollection}
>  import 
> org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
> -import 
> org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection
>  => LAlterConfigsResourceCollection}
> -import 
> org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource 
> => LAlterConfigsResource}
> -import 
> org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection
>  => LAlterableConfigCollection}
> -import 
> org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => 
> LAlterableConfig}
> +import 
> org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource 
> => LAlterConfigsResource, AlterConfigsResourceCollection => 
> LAlterConfigsResourceCollection, AlterableConfig => LAlterableConfig, 
> AlterableConfigCollection => LAlterableConfigCollection}
>  import 
> org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
>  => LAlterConfigsResourceResponse}
>  import org.apache.kafka.common.message.ApiMessageType.ListenerType
>  import 
> org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.DescribedGroup
> +import 
> org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
>  import 
> org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, 
> CreatableTopicCollection}
> +import 
> org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
>  import 
> org.apache.kafka.common.message.DescribeConfigsResponseData.DescribeConfigsResult
> -import 
> org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
>  => IAlterConfigsResource}
> -import 
> org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResourceCollection
>  => IAlterConfigsResourceCollection}
> -import 
> org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfig
>  => IAlterableConfig}
> -import 
> org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterableConfigCollection
>  => IAlterableConfigCollection}
> +import 
> org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource
>  => IAlterConfigsResource, AlterConfigsResourceCollection => 
> IAlterConfigsResourceCollection, AlterableConfig => IAlterableConfig, 
> AlterableConfigCollection => IAlterableConfigCollection}
>  import 
> org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{AlterConfigsResourceResponse
>  => IAlterConfigsResourceResponse}
>  import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
> +import 
> org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
>  import 
> org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, 
> ListOffsetsTopic}
>  import 
> org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
>  import 
> org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition,
>  OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
> +import 
> org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
>  OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, 
> OffsetDeleteResponseTopicCollection}
>  import 
> org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState,
>  StopReplicaTopicState}
>  import 
> org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
>  UpdateMetadataEndpoint, UpdateMetadataPartitionState}
>  import org.apache.kafka.common.message._
> @@ -79,32 +71,33 @@ import org.apache.kafka.common.resource.{PatternType, 
> Resource, ResourcePattern,
>  import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
> KafkaPrincipalSerde, SecurityProtocol}
>  import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
>  import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, 
> Utils}
> -import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, 
> TopicIdPartition, TopicPartition, Uuid}
> -import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
> Authorizer}
> -import org.junit.jupiter.api.Assertions._
> -import org.junit.jupiter.api.{AfterEach, Test}
> -import org.junit.jupiter.params.ParameterizedTest
> -import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
> -import org.mockito.ArgumentMatchers.{any, anyBoolean, anyDouble, anyInt, 
> anyLong, anyShort, anyString, argThat, isNotNull}
> -import org.mockito.Mockito.{mock, reset, times, verify, when}
> -import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
> -
> -import scala.collection.{Map, Seq, mutable}
> -import scala.jdk.CollectionConverters._
> -import 
> org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
> -import 
> org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
> -import 
> org.apache.kafka.common.message.ListClientMetricsResourcesResponseData.ClientMetricsResource
> -import 
> org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition,
>  OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, 
> OffsetDeleteResponseTopicCollection}
> +import org.apache.kafka.common._
>  import org.apache.kafka.coordinator.group.GroupCoordinator
>  import org.apache.kafka.server.ClientMetricsManager
> -import org.apache.kafka.server.common.{Features, MetadataVersion}
> +import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
> Authorizer}
>  import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, 
> IBP_2_2_IV1}
> +import org.apache.kafka.server.common.{Features, MetadataVersion}
>  import org.apache.kafka.server.config.{ConfigType, Defaults}
>  import org.apache.kafka.server.metrics.ClientMetricsTestUtils
>  import org.apache.kafka.server.util.{FutureUtils, MockTime}
>  import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchParams, 
> FetchPartitionData, LogConfig}
> +import org.junit.jupiter.api.Assertions._
> +import org.junit.jupiter.api.{AfterEach, Test}
> +import org.junit.jupiter.params.ParameterizedTest
> +import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
> +import org.mockito.ArgumentMatchers._
> +import org.mockito.Mockito._
> +import org.mockito.{ArgumentCaptor, ArgumentMatchers, Mockito}
>  
> +import java.net.InetAddress
> +import java.nio.charset.StandardCharsets
>  import java.time.Duration
> +import java.util
> +import java.util.Arrays.asList
> +import java.util.concurrent.{CompletableFuture, TimeUnit}
> +import java.util.{Collections, Comparator, Optional, OptionalInt, 
> OptionalLong, Properties}
> +import scala.collection.{Map, Seq, mutable}
> +import scala.jdk.CollectionConverters._
>  
>  class KafkaApisTest extends Logging {
>    private val requestChannel: RequestChannel = mock(classOf[RequestChannel])
> @@ -4146,7 +4139,7 @@ class KafkaApisTest extends Logging {
>      }
>    }
>  
> -  /**
> +    /**
>     * Verifies that sending a fetch request with version 9 works correctly 
> when
>     * ReplicaManager.getLogConfig returns None.
>     */
> diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
> b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
> index 87554990415..6047ad43790 100644
> --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
> @@ -16,28 +16,29 @@
>    */
>  package kafka.server
>  
> -import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid}
> -
> -import java.util
> -import java.util.Arrays.asList
> -import java.util.Collections
>  import kafka.api.LeaderAndIsr
>  import kafka.server.metadata.{KRaftMetadataCache, MetadataSnapshot, 
> ZkMetadataCache}
> +import 
> org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
>  import 
> org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
>  UpdateMetadataEndpoint, UpdateMetadataPartitionState, 
> UpdateMetadataTopicState}
> +import 
> org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, 
> BrokerEndpointCollection}
> +import org.apache.kafka.common.metadata._
>  import org.apache.kafka.common.network.ListenerName
>  import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
>  import org.apache.kafka.common.record.RecordBatch
>  import org.apache.kafka.common.requests.{AbstractControlRequest, 
> UpdateMetadataRequest}
>  import org.apache.kafka.common.security.auth.SecurityProtocol
> -import org.apache.kafka.common.metadata.{BrokerRegistrationChangeRecord, 
> PartitionRecord, RegisterBrokerRecord, RemoveTopicRecord, TopicRecord}
> -import 
> org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, 
> BrokerEndpointCollection}
> +import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid}
>  import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, 
> MetadataProvenance}
> +import org.apache.kafka.metadata.LeaderRecoveryState
>  import org.apache.kafka.server.common.MetadataVersion
>  import org.junit.jupiter.api.Assertions._
> +import org.junit.jupiter.api.Test
>  import org.junit.jupiter.params.ParameterizedTest
>  import org.junit.jupiter.params.provider.MethodSource
> -import org.junit.jupiter.api.Test
>  
> +import java.util
> +import java.util.Arrays.asList
> +import java.util.Collections
>  import scala.collection.{Seq, mutable}
>  import scala.jdk.CollectionConverters._
>  
> @@ -53,7 +54,7 @@ object MetadataCacheTest {
>        MetadataCache.kRaftMetadataCache(1)
>      )
>  
> -  def updateCache(cache: MetadataCache, request: UpdateMetadataRequest): 
> Unit = {
> +  def updateCache(cache: MetadataCache, request: UpdateMetadataRequest, 
> records: Seq[ApiMessage] = List()): Unit = {
>      cache match {
>        case c: ZkMetadataCache => c.updateMetadata(0, request)
>        case c: KRaftMetadataCache => {
> @@ -126,6 +127,7 @@ object MetadataCacheTest {
>          request.topicStates().forEach { topic =>
>            toRecords(topic).foreach(delta.replay)
>          }
> +        records.foreach(delta.replay)
>          c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L)))
>        }
>        case _ => throw new RuntimeException("Unsupported cache type")
> @@ -742,6 +744,169 @@ class MetadataCacheTest {
>      assertEquals(-1L, metadataCache.getAliveBrokerEpoch(1).getOrElse(-1L))
>    }
>  
> +  @Test
> +  def testGetTopicMetadataForDescribeTopicPartitionsResponse(): Unit = {
> +    val metadataCache = MetadataCache.kRaftMetadataCache(0)
> +
> +    val controllerId = 2
> +    val controllerEpoch = 1
> +    val securityProtocol = SecurityProtocol.PLAINTEXT
> +    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
> +    val topic0 = "test0"
> +    val topic1 = "test1"
> +
> +    val topicIds = new util.HashMap[String, Uuid]()
> +    topicIds.put(topic0, Uuid.randomUuid())
> +    topicIds.put(topic1, Uuid.randomUuid())
> +
> +    val partitionMap = Map[(String, Int), PartitionRecord](
> +      (topic0, 0) -> new PartitionRecord()
> +        .setTopicId(topicIds.get(topic0))
> +        .setPartitionId(0)
> +        .setReplicas(asList(0, 1, 2))
> +        .setLeader(0)
> +        .setIsr(asList(0))
> +        .setEligibleLeaderReplicas(asList(1))
> +        .setLastKnownElr(asList(2))
> +        .setLeaderEpoch(0)
> +        .setPartitionEpoch(1)
> +        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
> +      (topic0, 2) -> new PartitionRecord()
> +        .setTopicId(topicIds.get(topic0))
> +        .setPartitionId(2)
> +        .setReplicas(asList(0, 2, 3))
> +        .setLeader(3)
> +        .setIsr(asList(3))
> +        .setEligibleLeaderReplicas(asList(2))
> +        .setLastKnownElr(asList(0))
> +        .setLeaderEpoch(1)
> +        .setPartitionEpoch(2)
> +        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
> +      (topic0, 1) -> new PartitionRecord()
> +        .setTopicId(topicIds.get(topic0))
> +        .setPartitionId(1)
> +        .setReplicas(asList(0, 1, 3))
> +        .setLeader(0)
> +        .setIsr(asList(0))
> +        .setEligibleLeaderReplicas(asList(1))
> +        .setLastKnownElr(asList(3))
> +        .setLeaderEpoch(0)
> +        .setPartitionEpoch(2)
> +        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
> +      (topic1, 0) -> new PartitionRecord()
> +        .setTopicId(topicIds.get(topic1))
> +        .setPartitionId(0)
> +        .setReplicas(asList(0, 1, 2))
> +        .setLeader(2)
> +        .setIsr(asList(2))
> +        .setEligibleLeaderReplicas(asList(1))
> +        .setLastKnownElr(asList(0))
> +        .setLeaderEpoch(10)
> +        .setPartitionEpoch(11)
> +        .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
> +    )
> +
> +    val brokers = Seq(
> +      new UpdateMetadataBroker().setId(0).setEndpoints(Seq(new 
> UpdateMetadataEndpoint().setHost("foo0").setPort(9092).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
> +      new UpdateMetadataBroker().setId(1).setEndpoints(Seq(new 
> UpdateMetadataEndpoint().setHost("foo1").setPort(9093).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
> +      new UpdateMetadataBroker().setId(2).setEndpoints(Seq(new 
> UpdateMetadataEndpoint().setHost("foo2").setPort(9094).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
> +      new UpdateMetadataBroker().setId(3).setEndpoints(Seq(new 
> UpdateMetadataEndpoint().setHost("foo3").setPort(9095).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
> +    )
> +
> +    val version = ApiKeys.UPDATE_METADATA.latestVersion
> +    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
> controllerId, controllerEpoch, brokerEpoch,
> +      List[UpdateMetadataPartitionState]().asJava, brokers.asJava, 
> topicIds).build()
> +    var recordSeq = Seq[ApiMessage](
> +      new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)),
> +      new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
> +    )
> +    recordSeq = recordSeq ++ partitionMap.values.toSeq
> +    MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest, 
> recordSeq)
> +
> +    def checkTopicMetadata(topic: String, partitionIds: Set[Int], 
> partitions: mutable.Buffer[DescribeTopicPartitionsResponsePartition]): Unit = 
> {
> +      partitions.foreach(partition => {
> +        val partitionId = partition.partitionIndex()
> +        assertTrue(partitionIds.contains(partitionId))
> +        val expectedPartition = partitionMap.get((topic, partitionId)).get
> +        assertEquals(0, partition.errorCode())
> +        assertEquals(expectedPartition.leaderEpoch(), 
> partition.leaderEpoch())
> +        assertEquals(expectedPartition.partitionId(), 
> partition.partitionIndex())
> +        assertEquals(expectedPartition.eligibleLeaderReplicas(), 
> partition.eligibleLeaderReplicas())
> +        assertEquals(expectedPartition.isr(), partition.isrNodes())
> +        assertEquals(expectedPartition.lastKnownElr(), 
> partition.lastKnownElr())
> +        assertEquals(expectedPartition.leader(), partition.leaderId())
> +      })
> +    }
> +
> +    // Basic test
> +    var result = 
> metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, 
> topic1).iterator, listenerName, _ => 0, 10, false).topics().asScala.toList
> +    assertEquals(2, result.size)
> +    var resultTopic = result(0)
> +    assertEquals(topic0, resultTopic.name())
> +    assertEquals(0, resultTopic.errorCode())
> +    assertEquals(topicIds.get(topic0), resultTopic.topicId())
> +    assertEquals(3, resultTopic.partitions().size())
> +    checkTopicMetadata(topic0, Set(0, 1, 2), 
> resultTopic.partitions().asScala)
> +
> +    resultTopic = result(1)
> +    assertEquals(topic1, resultTopic.name())
> +    assertEquals(0, resultTopic.errorCode())
> +    assertEquals(topicIds.get(topic1), resultTopic.topicId())
> +    assertEquals(1, resultTopic.partitions().size())
> +    checkTopicMetadata(topic1, Set(0), resultTopic.partitions().asScala)
> +
> +    // Quota reached
> +    var response = 
> metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, 
> topic1).iterator, listenerName, _ => 0, 2, false)
> +    result = response.topics().asScala.toList
> +    assertEquals(1, result.size)
> +    resultTopic = result(0)
> +    assertEquals(topic0, resultTopic.name())
> +    assertEquals(0, resultTopic.errorCode())
> +    assertEquals(topicIds.get(topic0), resultTopic.topicId())
> +    assertEquals(2, resultTopic.partitions().size())
> +    checkTopicMetadata(topic0, Set(0, 1), resultTopic.partitions().asScala)
> +    assertEquals(topic0, response.nextCursor().topicName())
> +    assertEquals(2, response.nextCursor().partitionIndex())
> +
> +    // With start index
> +    result = 
> metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0).iterator, 
> listenerName, t => if (t.equals(topic0)) 1 else 0, 10, 
> false).topics().asScala.toList
> +    assertEquals(1, result.size)
> +    resultTopic = result(0)
> +    assertEquals(topic0, resultTopic.name())
> +    assertEquals(0, resultTopic.errorCode())
> +    assertEquals(topicIds.get(topic0), resultTopic.topicId())
> +    assertEquals(2, resultTopic.partitions().size())
> +    checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala)
> +
> +    // With start index and quota reached
> +    response = 
> metadataCache.getTopicMetadataForDescribeTopicResponse(Seq(topic0, 
> topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
> +    result = response.topics().asScala.toList
> +    assertEquals(1, result.size)
> +
> +    resultTopic = result(0)
> +    assertEquals(topic0, resultTopic.name())
> +    assertEquals(0, resultTopic.errorCode())
> +    assertEquals(topicIds.get(topic0), resultTopic.topicId())
> +    assertEquals(1, resultTopic.partitions().size())
> +    checkTopicMetadata(topic0, Set(2), resultTopic.partitions().asScala)
> +    assertEquals(topic1, response.nextCursor().topicName())
> +    assertEquals(0, response.nextCursor().partitionIndex())
> +
> +    // When the first topic does not exist
> +    result = 
> metadataCache.getTopicMetadataForDescribeTopicResponse(Seq("Non-exist", 
> topic0).iterator, listenerName, t => if (t.equals("Non-exist")) 1 else 0, 1, 
> false).topics().asScala.toList
> +    assertEquals(2, result.size)
> +    resultTopic = result(0)
> +    assertEquals("Non-exist", resultTopic.name())
> +    assertEquals(3, resultTopic.errorCode())
> +
> +    resultTopic = result(1)
> +    assertEquals(topic0, resultTopic.name())
> +    assertEquals(0, resultTopic.errorCode())
> +    assertEquals(topicIds.get(topic0), resultTopic.topicId())
> +    assertEquals(1, resultTopic.partitions().size())
> +    checkTopicMetadata(topic0, Set(0), resultTopic.partitions().asScala)
> +  }
> +
>    @ParameterizedTest
>    @MethodSource(Array("cacheProvider"))
>    def testGetPartitionInfo(cache: MetadataCache): Unit = {
> diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
> b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
> index bf9dd7b736e..d765dda5a0a 100644
> --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
> +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
> @@ -712,6 +712,9 @@ class RequestQuotaTest extends BaseRequestTest {
>          case ApiKeys.LIST_CLIENT_METRICS_RESOURCES =>
>            new ListClientMetricsResourcesRequest.Builder(new 
> ListClientMetricsResourcesRequestData())
>  
> +        case ApiKeys.DESCRIBE_TOPIC_PARTITIONS =>
> +          new DescribeTopicPartitionsRequest.Builder(new 
> DescribeTopicPartitionsRequestData())
> +
>          case _ =>
>            throw new IllegalArgumentException("Unsupported API key " + apiKey)
>      }
> diff --git 
> a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
>  
> b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
> index adac0fb23db..c2a0bbc4928 100644
> --- 
> a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
> +++ 
> b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
> @@ -52,7 +52,7 @@ public class PartitionChangeBuilder {
>      public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
>          if (record.isr() != null) return false;
>          if (record.eligibleLeaderReplicas() != null) return false;
> -        if (record.lastKnownELR() != null) return false;
> +        if (record.lastKnownElr() != null) return false;
>          if (record.leader() != NO_LEADER_CHANGE) return false;
>          if (record.replicas() != null) return false;
>          if (record.removingReplicas() != null) return false;
> @@ -492,7 +492,7 @@ public class PartitionChangeBuilder {
>          if (record.isr() != null && record.isr().isEmpty() && 
> (partition.lastKnownElr.length != 1 ||
>              partition.lastKnownElr[0] != partition.leader)) {
>              // Only update the last known leader when the first time the 
> partition becomes leaderless.
> -            record.setLastKnownELR(Arrays.asList(partition.leader));
> +            record.setLastKnownElr(Arrays.asList(partition.leader));
>          }
>      }
>  
> @@ -518,14 +518,14 @@ public class PartitionChangeBuilder {
>          }
>  
>          if 
> (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
> -            record.setLastKnownELR(targetLastKnownElr);
> +            record.setLastKnownElr(targetLastKnownElr);
>          }
>      }
>  
>      private void maybePopulateTargetElr() {
>          if (!eligibleLeaderReplicasEnabled) return;
>  
> -        // If the ISR is larger or equal to the min ISR, clear the ELR and 
> lastKnownELR.
> +        // If the ISR is larger or equal to the min ISR, clear the ELR and 
> LastKnownElr.
>          if (targetIsr.size() >= minISR) {
>              targetElr = Collections.emptyList();
>              targetLastKnownElr = Collections.emptyList();
> diff --git 
> a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java 
> b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
> index 97b9a60116d..72476cf206c 100644
> --- 
> a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
> +++ 
> b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
> @@ -200,7 +200,7 @@ public class PartitionRegistration {
>              record.leaderEpoch(),
>              record.partitionEpoch(),
>              Replicas.toArray(record.eligibleLeaderReplicas()),
> -            Replicas.toArray(record.lastKnownELR()));
> +            Replicas.toArray(record.lastKnownElr()));
>      }
>  
>      private PartitionRegistration(int[] replicas, Uuid[] directories, int[] 
> isr, int[] removingReplicas,
> @@ -255,7 +255,7 @@ public class PartitionRegistration {
>          LeaderRecoveryState newLeaderRecoveryState = 
> leaderRecoveryState.changeTo(record.leaderRecoveryState());
>  
>          int[] newElr = (record.eligibleLeaderReplicas() == null) ? elr : 
> Replicas.toArray(record.eligibleLeaderReplicas());
> -        int[] newLastKnownElr = (record.lastKnownELR() == null) ? 
> lastKnownElr : Replicas.toArray(record.lastKnownELR());
> +        int[] newLastKnownElr = (record.lastKnownElr() == null) ? 
> lastKnownElr : Replicas.toArray(record.lastKnownElr());
>          return new PartitionRegistration(newReplicas,
>              defaultToMigrating(newDirectories, replicas.length),
>              newIsr,
> @@ -381,7 +381,7 @@ public class PartitionRegistration {
>              // The following are tagged fields, we should only set them when 
> there are some contents, in order to save
>              // spaces.
>              if (elr.length > 0) 
> record.setEligibleLeaderReplicas(Replicas.toList(elr));
> -            if (lastKnownElr.length > 0) 
> record.setLastKnownELR(Replicas.toList(lastKnownElr));
> +            if (lastKnownElr.length > 0) 
> record.setLastKnownElr(Replicas.toList(lastKnownElr));
>          }
>          if (options.metadataVersion().isDirectoryAssignmentSupported()) {
>              record.setDirectories(Uuid.toList(directories));
> diff --git 
> a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json 
> b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
> index f22669943ca..b6a3c2228b2 100644
> --- a/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
> +++ b/metadata/src/main/resources/common/metadata/PartitionChangeRecord.json
> @@ -18,7 +18,7 @@
>    "type": "metadata",
>    "name": "PartitionChangeRecord",
>    // Version 1 adds Directories for KIP-858.
> -  // Version 2 implements Eligible Leader Replicas and LastKnownELR as 
> described in KIP-966.
> +  // Version 2 implements Eligible Leader Replicas and LastKnownElr as 
> described in KIP-966.
>    "validVersions": "0-2",
>    "flexibleVersions": "0+",
>    "fields": [
> @@ -49,8 +49,8 @@
>      { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
> "null", "entityType": "brokerId",
>        "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", 
> "tag": 6,
>        "about": "null if the ELR didn't change; the new eligible leader 
> replicas otherwise." },
> -    { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
> "entityType": "brokerId",
> +    { "name": "LastKnownElr", "type": "[]int32", "default": "null", 
> "entityType": "brokerId",
>        "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", 
> "tag": 7,
> -      "about": "null if the LastKnownELR didn't change; the last known 
> eligible leader replicas otherwise." }
> +      "about": "null if the LastKnownElr didn't change; the last known 
> eligible leader replicas otherwise." }
>    ]
>  }
> diff --git a/metadata/src/main/resources/common/metadata/PartitionRecord.json 
> b/metadata/src/main/resources/common/metadata/PartitionRecord.json
> index 5c84a2e556f..c554561b330 100644
> --- a/metadata/src/main/resources/common/metadata/PartitionRecord.json
> +++ b/metadata/src/main/resources/common/metadata/PartitionRecord.json
> @@ -18,7 +18,7 @@
>    "type": "metadata",
>    "name": "PartitionRecord",
>    // Version 1 adds Directories for KIP-858
> -  // Version 2 implements Eligible Leader Replicas and LastKnownELR as 
> described in KIP-966.
> +  // Version 2 implements Eligible Leader Replicas and LastKnownElr as 
> described in KIP-966.
>    "validVersions": "0-2",
>    "flexibleVersions": "0+",
>    "fields": [
> @@ -47,7 +47,7 @@
>      { "name": "EligibleLeaderReplicas", "type": "[]int32", "default": 
> "null", "entityType": "brokerId",
>        "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", 
> "tag": 1,
>        "about": "The eligible leader replicas of this partition." },
> -    { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
> "entityType": "brokerId",
> +    { "name": "LastKnownElr", "type": "[]int32", "default": "null", 
> "entityType": "brokerId",
>        "versions": "2+", "nullableVersions": "2+", "taggedVersions": "2+", 
> "tag": 2,
>        "about": "The last known eligible leader replicas of this partition." }
>    ]
> diff --git 
> a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
>  
> b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
> index cb00a3830d2..efc9bd2a24f 100644
> --- 
> a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
> +++ 
> b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
> @@ -86,7 +86,7 @@ public class PartitionChangeBuilderTest {
>          assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
>                  setEligibleLeaderReplicas(Arrays.asList(5))));
>          assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
> -                setLastKnownELR(Arrays.asList(6))));
> +                setLastKnownElr(Arrays.asList(6))));
>          assertFalse(
>              changeRecordIsNoOp(
>                  new PartitionChangeRecord()
> @@ -575,7 +575,7 @@ public class PartitionChangeBuilderTest {
>          if (version >= 2) {
>              // The test partition has ELR, so unclean election will clear 
> these fiedls.
>              record.setEligibleLeaderReplicas(Collections.emptyList())
> -                .setLastKnownELR(Collections.emptyList());
> +                .setLastKnownElr(Collections.emptyList());
>          }
>  
>          expectedRecord = new ApiMessageAndVersion(record, version);
> @@ -890,7 +890,7 @@ public class PartitionChangeBuilderTest {
>  
>          // Both versions will set the elr and lastKnownElr as empty list.
>          record.setEligibleLeaderReplicas(Collections.emptyList())
> -            .setLastKnownELR(Collections.emptyList());
> +            .setLastKnownElr(Collections.emptyList());
>          ApiMessageAndVersion expectedRecord = new 
> ApiMessageAndVersion(record, version);
>          assertEquals(Optional.of(expectedRecord), builder.build());
>          partition = partition.merge((PartitionChangeRecord) 
> builder.build().get().message());
> @@ -935,9 +935,9 @@ public class PartitionChangeBuilderTest {
>              .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
>          if (version < 2) {
>              record.setEligibleLeaderReplicas(Collections.emptyList());
> -            record.setLastKnownELR(Collections.emptyList());
> +            record.setLastKnownElr(Collections.emptyList());
>          }
> -        // No change is expected to ELR/LastKnownELR.
> +        // No change is expected to ELR/LastKnownElr.
>          ApiMessageAndVersion expectedRecord = new 
> ApiMessageAndVersion(record, version);
>          assertEquals(Optional.of(expectedRecord), builder.build());
>          partition = partition.merge((PartitionChangeRecord) 
> builder.build().get().message());
> @@ -987,7 +987,7 @@ public class PartitionChangeBuilderTest {
>              .setLeaderRecoveryState(LeaderRecoveryState.NO_CHANGE);
>          if (version >= 2) {
>              record.setEligibleLeaderReplicas(Arrays.asList(2))
> -                .setLastKnownELR(Arrays.asList(3));
> +                .setLastKnownElr(Arrays.asList(3));
>          } else {
>              record.setEligibleLeaderReplicas(Collections.emptyList());
>          }
> @@ -1161,7 +1161,7 @@ public class PartitionChangeBuilderTest {
>              .setEligibleLeaderReplicas(Arrays.asList(1, 2, 3, 4));
>  
>          if (lastKnownLeaderEnabled) {
> -            record.setLastKnownELR(Arrays.asList(1));
> +            record.setLastKnownElr(Arrays.asList(1));
>          }
>  
>          ApiMessageAndVersion expectedRecord = new 
> ApiMessageAndVersion(record, version);
> @@ -1178,7 +1178,7 @@ public class PartitionChangeBuilderTest {
>                  .setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
>                  
> .setUseLastKnownLeaderInBalancedRecovery(lastKnownLeaderEnabled);
>              PartitionChangeRecord changeRecord = (PartitionChangeRecord) 
> builder.build().get().message();
> -            assertTrue(changeRecord.lastKnownELR() == null, 
> changeRecord.toString());
> +            assertTrue(changeRecord.lastKnownElr() == null, 
> changeRecord.toString());
>          } else {
>              assertTrue(Arrays.equals(new int[]{}, partition.lastKnownElr), 
> partition.toString());
>          }
> @@ -1217,7 +1217,7 @@ public class PartitionChangeBuilderTest {
>                  .setIsr(Arrays.asList(1))
>                  .setLeader(1)
>                  
> .setLeaderRecoveryState(LeaderRecoveryState.RECOVERING.value())
> -                .setLastKnownELR(Collections.emptyList()),
> +                .setLastKnownElr(Collections.emptyList()),
>              version
>          );
>          assertEquals(Optional.of(expectedRecord), builder.build());
> diff --git 
> a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
>  
> b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
> index 37c3cd16f80..9de6238eca1 100644
> --- 
> a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
> +++ 
> b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
> @@ -320,7 +320,7 @@ public class PartitionRegistrationTest {
>          if (metadataVersion.isElrSupported()) {
>              expectRecord.
>                  setEligibleLeaderReplicas(Arrays.asList(2, 3)).
> -                setLastKnownELR(Arrays.asList(4));
> +                setLastKnownElr(Arrays.asList(4));
>          }
>          if (metadataVersion.isDirectoryAssignmentSupported()) {
>              expectRecord.setDirectories(Arrays.asList(
> diff --git 
> a/server/src/main/java/org/apache/kafka/server/config/Defaults.java 
> b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
> index f6682ac2f3f..121f89dc8a1 100644
> --- a/server/src/main/java/org/apache/kafka/server/config/Defaults.java
> +++ b/server/src/main/java/org/apache/kafka/server/config/Defaults.java
> @@ -193,6 +193,9 @@ public class Defaults {
>      public static final int MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS = 1000;
>      public static final int FETCH_MAX_BYTES = 55 * 1024 * 1024;
>  
> +    /** ********* Request Limit Configuration ***********/
> +    public static final int MAX_REQUEST_PARTITION_SIZE_LIMIT = 2000;
> +
>      /** ********* Quota Configuration *********/
>      public static final int NUM_QUOTA_SAMPLES = 
> ClientQuotaManagerConfig.DEFAULT_NUM_QUOTA_SAMPLES;
>      public static final int QUOTA_WINDOW_SIZE_SECONDS = 
> ClientQuotaManagerConfig.DEFAULT_QUOTA_WINDOW_SIZE_SECONDS;

Reply via email to