This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2cf87bff9b6 KAFKA-16953; Properly implement the sending of 
DescribeQuorumResponse (#16637)
2cf87bff9b6 is described below

commit 2cf87bff9b6b5136a22539edf48b2d7cc668bdf9
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon Jul 29 11:36:17 2024 -0700

    KAFKA-16953; Properly implement the sending of DescribeQuorumResponse 
(#16637)
    
    This change allows the KRaft leader to send the DescribeQuorumResponse 
version based on the schema version used by the client.
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../main/java/org/apache/kafka/raft/Endpoints.java |  14 +
 .../org/apache/kafka/raft/KafkaRaftClient.java     |   8 +-
 .../java/org/apache/kafka/raft/LeaderState.java    | 112 ++---
 .../main/java/org/apache/kafka/raft/RaftUtil.java  |  73 ++-
 .../org/apache/kafka/raft/internals/VoterSet.java  |   5 +-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 427 ++++++++++++++--
 .../org/apache/kafka/raft/LeaderStateTest.java     | 543 ---------------------
 .../apache/kafka/raft/RaftClientTestContext.java   |  29 +-
 8 files changed, 542 insertions(+), 669 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java 
b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
index 41292252ae6..5e979022ec3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
+++ b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
@@ -19,6 +19,7 @@ package org.apache.kafka.raft;
 import org.apache.kafka.common.message.AddRaftVoterRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.message.EndQuorumEpochRequestData;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchResponseData;
@@ -113,7 +114,20 @@ public final class Endpoints {
                     .setPort(entry.getValue().getPort())
             );
         }
+        return listeners;
+    }
 
+    public DescribeQuorumResponseData.ListenerCollection 
toDescribeQuorumResponseListeners() {
+        DescribeQuorumResponseData.ListenerCollection listeners =
+            new 
DescribeQuorumResponseData.ListenerCollection(endpoints.size());
+        for (Map.Entry<ListenerName, InetSocketAddress> entry : 
endpoints.entrySet()) {
+            listeners.add(
+                new DescribeQuorumResponseData.Listener()
+                    .setName(entry.getKey().value())
+                    .setHost(entry.getValue().getHostString())
+                    .setPort(entry.getValue().getPort())
+            );
+        }
         return listeners;
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 88de2e94d21..0b454666db3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1706,8 +1706,12 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         return RaftUtil.singletonDescribeQuorumResponse(
             requestMetadata.apiVersion(),
             log.topicPartition(),
-            leaderState.describeQuorum(currentTimeMs),
-            leaderState.nodes(currentTimeMs)
+            quorum.localIdOrThrow(),
+            leaderState.epoch(),
+            
leaderState.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L),
+            leaderState.voterStates().values(),
+            leaderState.observerStates(currentTimeMs).values(),
+            currentTimeMs
         );
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index e07a2ad04cb..06deb28c69c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.message.KRaftVersionRecord;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
@@ -38,7 +37,6 @@ import org.apache.kafka.server.common.KRaftVersion;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -113,7 +111,7 @@ public class LeaderState<T> implements EpochState {
             boolean hasAcknowledgedLeader = voterNode.isVoter(localReplicaKey);
             this.voterStates.put(
                 voterNode.voterKey().id(),
-                new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader)
+                new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, 
voterNode.listeners())
             );
         }
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
@@ -395,6 +393,15 @@ public class LeaderState<T> implements EpochState {
         return endpoints;
     }
 
+    Map<Integer, ReplicaState> voterStates() {
+        return voterStates;
+    }
+
+    Map<ReplicaKey, ReplicaState> observerStates(final long currentTimeMs) {
+        clearInactiveObservers(currentTimeMs);
+        return observerStates;
+    }
+
     public Set<Integer> grantingVoters() {
         return this.grantingVoters;
     }
@@ -599,7 +606,7 @@ public class LeaderState<T> implements EpochState {
     private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
         ReplicaState state = voterStates.get(replicaKey.id());
         if (state == null || !state.matchesKey(replicaKey)) {
-            observerStates.putIfAbsent(replicaKey, new 
ReplicaState(replicaKey, false));
+            observerStates.putIfAbsent(replicaKey, new 
ReplicaState(replicaKey, false, Endpoints.empty()));
             return observerStates.get(replicaKey);
         }
         return state;
@@ -614,58 +621,6 @@ public class LeaderState<T> implements EpochState {
         return Optional.ofNullable(state);
     }
 
-    public DescribeQuorumResponseData.PartitionData describeQuorum(long 
currentTimeMs) {
-        clearInactiveObservers(currentTimeMs);
-
-        return new DescribeQuorumResponseData.PartitionData()
-            .setErrorCode(Errors.NONE.code())
-            .setLeaderId(localReplicaKey.id())
-            .setLeaderEpoch(epoch)
-            
.setHighWatermark(highWatermark.map(LogOffsetMetadata::offset).orElse(-1L))
-            .setCurrentVoters(describeReplicaStates(voterStates.values(), 
currentTimeMs))
-            .setObservers(describeReplicaStates(observerStates.values(), 
currentTimeMs));
-    }
-
-    public DescribeQuorumResponseData.NodeCollection nodes(long currentTimeMs) 
{
-        clearInactiveObservers(currentTimeMs);
-
-        // KAFKA-16953 will add support for including the node listeners in 
the node collection
-        return new DescribeQuorumResponseData.NodeCollection();
-    }
-
-    private List<DescribeQuorumResponseData.ReplicaState> 
describeReplicaStates(
-        Collection<ReplicaState> states,
-        long currentTimeMs
-    ) {
-        return states
-            .stream()
-            .map(replicaState -> describeReplicaState(replicaState, 
currentTimeMs))
-            .collect(Collectors.toList());
-    }
-
-    private DescribeQuorumResponseData.ReplicaState describeReplicaState(
-        ReplicaState replicaState,
-        long currentTimeMs
-    ) {
-        final long lastCaughtUpTimestamp;
-        final long lastFetchTimestamp;
-        if (replicaState.matchesKey(localReplicaKey)) {
-            lastCaughtUpTimestamp = currentTimeMs;
-            lastFetchTimestamp = currentTimeMs;
-        } else {
-            lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp;
-            lastFetchTimestamp = replicaState.lastFetchTimestamp;
-        }
-
-        // KAFKA-16953 will add support for the replica directory id
-        return new DescribeQuorumResponseData.ReplicaState()
-            .setReplicaId(replicaState.replicaKey.id())
-            
.setLogEndOffset(replicaState.endOffset.map(LogOffsetMetadata::offset).orElse(-1L))
-            .setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
-            .setLastFetchTimestamp(lastFetchTimestamp);
-
-    }
-
     /**
      * Clear observer states that have not been active for a while and are not 
the leader.
      */
@@ -688,7 +643,7 @@ public class LeaderState<T> implements EpochState {
         // Compute the new voter states map
         for (VoterSet.VoterNode voterNode : lastVoterSet.voterNodes()) {
             ReplicaState state = getReplicaState(voterNode.voterKey())
-                .orElse(new ReplicaState(voterNode.voterKey(), false));
+                .orElse(new ReplicaState(voterNode.voterKey(), false, 
voterNode.listeners()));
 
             // Remove the voter from the previous data structures
             oldVoterStates.remove(voterNode.voterKey().id());
@@ -702,20 +657,23 @@ public class LeaderState<T> implements EpochState {
 
         // Move any of the remaining old voters to observerStates
         for (ReplicaState replicaStateEntry : oldVoterStates.values()) {
+            replicaStateEntry.clearListeners();
             observerStates.putIfAbsent(replicaStateEntry.replicaKey, 
replicaStateEntry);
         }
     }
 
-    private static class ReplicaState implements Comparable<ReplicaState> {
-        ReplicaKey replicaKey;
-        Optional<LogOffsetMetadata> endOffset;
-        long lastFetchTimestamp;
-        long lastFetchLeaderLogEndOffset;
-        long lastCaughtUpTimestamp;
-        boolean hasAcknowledgedLeader;
+    static class ReplicaState implements Comparable<ReplicaState> {
+        private ReplicaKey replicaKey;
+        private Endpoints listeners;
+        private Optional<LogOffsetMetadata> endOffset;
+        private long lastFetchTimestamp;
+        private long lastFetchLeaderLogEndOffset;
+        private long lastCaughtUpTimestamp;
+        private boolean hasAcknowledgedLeader;
 
-        public ReplicaState(ReplicaKey replicaKey, boolean 
hasAcknowledgedLeader) {
+        public ReplicaState(ReplicaKey replicaKey, boolean 
hasAcknowledgedLeader, Endpoints listeners) {
             this.replicaKey = replicaKey;
+            this.listeners = listeners;
             this.endOffset = Optional.empty();
             this.lastFetchTimestamp = -1;
             this.lastFetchLeaderLogEndOffset = -1;
@@ -723,6 +681,26 @@ public class LeaderState<T> implements EpochState {
             this.hasAcknowledgedLeader = hasAcknowledgedLeader;
         }
 
+        public ReplicaKey replicaKey() {
+            return replicaKey;
+        }
+
+        public Endpoints listeners() {
+            return listeners;
+        }
+
+        public Optional<LogOffsetMetadata> endOffset() {
+            return endOffset;
+        }
+
+        public long lastFetchTimestamp() {
+            return lastFetchTimestamp;
+        }
+
+        public long lastCaughtUpTimestamp() {
+            return lastCaughtUpTimestamp;
+        }
+
         void setReplicaKey(ReplicaKey replicaKey) {
             if (this.replicaKey.id() != replicaKey.id()) {
                 throw new IllegalArgumentException(
@@ -747,6 +725,10 @@ public class LeaderState<T> implements EpochState {
             this.replicaKey = replicaKey;
         }
 
+        void clearListeners() {
+            this.listeners = Endpoints.empty();
+        }
+
         boolean matchesKey(ReplicaKey replicaKey) {
             if (this.replicaKey.id() != replicaKey.id()) return false;
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index e5c36e13703..4d2f3bc06e8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.raft.internals.ReplicaKey;
 
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -481,8 +482,12 @@ public class RaftUtil {
     public static DescribeQuorumResponseData singletonDescribeQuorumResponse(
         short apiVersion,
         TopicPartition topicPartition,
-        DescribeQuorumResponseData.PartitionData partitionData,
-        DescribeQuorumResponseData.NodeCollection nodes
+        int leaderId,
+        int leaderEpoch,
+        long highWatermark,
+        Collection<LeaderState.ReplicaState> voters,
+        Collection<LeaderState.ReplicaState> observers,
+        long currentTimeMs
     ) {
         DescribeQuorumResponseData response = new DescribeQuorumResponseData()
             .setTopics(
@@ -491,16 +496,25 @@ public class RaftUtil {
                         .setTopicName(topicPartition.topic())
                         .setPartitions(
                             Collections.singletonList(
-                                
partitionData.setPartitionIndex(topicPartition.partition())
-                            )
-                        )
-                )
-            );
-
+                                new DescribeQuorumResponseData.PartitionData()
+                                    
.setPartitionIndex(topicPartition.partition())
+                                    .setErrorCode(Errors.NONE.code())
+                                    .setLeaderId(leaderId)
+                                    .setLeaderEpoch(leaderEpoch)
+                                    .setHighWatermark(highWatermark)
+                                    
.setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs))
+                                    .setObservers(toReplicaStates(apiVersion, 
leaderId, observers, currentTimeMs))))));
         if (apiVersion >= 2) {
+            DescribeQuorumResponseData.NodeCollection nodes = new 
DescribeQuorumResponseData.NodeCollection(voters.size());
+            for (LeaderState.ReplicaState voter : voters) {
+                nodes.add(
+                    new DescribeQuorumResponseData.Node()
+                        .setNodeId(voter.replicaKey().id())
+                        
.setListeners(voter.listeners().toDescribeQuorumResponseListeners())
+                );
+            }
             response.setNodes(nodes);
         }
-
         return response;
     }
 
@@ -528,7 +542,7 @@ public class RaftUtil {
             .setErrorCode(error.code())
             .setErrorMessage(errorMessage);
     }
-
+    
     public static RemoveRaftVoterRequestData removeVoterRequest(
         String clusterId,
         ReplicaKey voter
@@ -550,6 +564,45 @@ public class RaftUtil {
             .setErrorMessage(errorMessage);
     }
 
+    private static List<DescribeQuorumResponseData.ReplicaState> 
toReplicaStates(
+        short apiVersion,
+        int leaderId,
+        Collection<LeaderState.ReplicaState> states,
+        long currentTimeMs
+    ) {
+        return states
+            .stream()
+            .map(replicaState -> toReplicaState(apiVersion, leaderId, 
replicaState, currentTimeMs))
+            .collect(Collectors.toList());
+    }
+
+    private static DescribeQuorumResponseData.ReplicaState toReplicaState(
+        short apiVersion,
+        int leaderId,
+        LeaderState.ReplicaState replicaState,
+        long currentTimeMs
+    ) {
+        final long lastCaughtUpTimestamp;
+        final long lastFetchTimestamp;
+        if (replicaState.replicaKey().id() == leaderId) {
+            lastCaughtUpTimestamp = currentTimeMs;
+            lastFetchTimestamp = currentTimeMs;
+        } else {
+            lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp();
+            lastFetchTimestamp = replicaState.lastFetchTimestamp();
+        }
+        DescribeQuorumResponseData.ReplicaState replicaStateData = new 
DescribeQuorumResponseData.ReplicaState()
+            .setReplicaId(replicaState.replicaKey().id())
+            
.setLogEndOffset(replicaState.endOffset().map(LogOffsetMetadata::offset).orElse(-1L))
+            .setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
+            .setLastFetchTimestamp(lastFetchTimestamp);
+
+        if (apiVersion >= 2) {
+            
replicaStateData.setReplicaDirectoryId(replicaState.replicaKey().directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID));
+        }
+        return replicaStateData;
+    }
+
     public static Optional<ReplicaKey> voteRequestVoterKey(
         VoteRequestData request,
         VoteRequestData.PartitionData partition
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index e665794c2af..447b44a93d9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -325,7 +325,10 @@ public final class VoterSet {
             }
         }
 
-        Endpoints listeners() {
+        /**
+         * Returns the listeners of the voter node
+         */
+        public Endpoints listeners() {
             return listeners;
         }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 8f78f29c7ba..419618c00ff 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -42,6 +42,8 @@ import org.apache.kafka.raft.errors.BufferAllocationException;
 import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException;
 import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.raft.internals.VoterSetTest;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
@@ -2765,77 +2767,412 @@ public class KafkaRaftClientTest {
 
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
-    public void testDescribeQuorum(boolean withKip853Rpc) throws Exception {
+    public void testDescribeQuorumWithOnlyStaticVoters(boolean withKip853Rpc) 
throws Exception {
         int localId = randomReplicaId();
-        ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
-        ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
-        Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), 
laggingFollower.id());
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey follower1 = replicaKey(localId + 1, true);
+        Set<Integer> voters = Utils.mkSet(localId, follower1.id());
 
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, local.directoryId().get())
+            .withStaticVoters(voters)
             .withKip853Rpc(withKip853Rpc)
             .build();
 
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
-        long laggingFollowerFetchTime = context.time.milliseconds();
-        context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, 
epoch, 0));
+        // Describe quorum response will not include directory ids
+        context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(1L, epoch);
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(1L)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower1.id())
+                .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1));
+        context.assertSentDescribeQuorumResponse(localId, epoch, -1L, 
expectedVoterStates, Collections.emptyList());
+    }
 
-        context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
+    @ParameterizedTest
+    @CsvSource({ "true, true", "true, false", "false, false" })
+    public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean 
withBootstrapSnapshot) throws Exception {
+        int localId = randomReplicaId();
+        int followerId1 = localId + 1;
+        int followerId2 = localId + 2;
+        ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+        // local directory id must exist
+        Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+        ReplicaKey bootstrapFollower1 = replicaKey(followerId1, 
withBootstrapSnapshot);
+        // if withBootstrapSnapshot is false, directory ids are still needed 
by the static voter set
+        Uuid followerDirectoryId1 = 
bootstrapFollower1.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID);
+        ReplicaKey follower1 = ReplicaKey.of(followerId1, 
followerDirectoryId1);
+        ReplicaKey bootstrapFollower2 = replicaKey(followerId2, 
withBootstrapSnapshot);
+        Uuid followerDirectoryId2 = 
bootstrapFollower2.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID);
+        ReplicaKey follower2 = ReplicaKey.of(followerId2, 
followerDirectoryId2);
+
+        RaftClientTestContext.Builder builder = new 
RaftClientTestContext.Builder(localId, localDirectoryId)
+            .withKip853Rpc(withKip853Rpc);
+
+        if (withBootstrapSnapshot) {
+            VoterSet bootstrapVoterSet = 
VoterSetTest.voterSet(Stream.of(local, bootstrapFollower1, bootstrapFollower2));
+            builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+        } else {
+            VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local, 
follower1, follower2));
+            builder.withStaticVoters(staticVoterSet);
+        }
+        RaftClientTestContext context = builder.build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Describe quorum response before any fetches made
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
context.localReplicaKey().directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(withBootstrapSnapshot ? 3L : 1L)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(followerId1)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
follower1.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1),
+            new ReplicaState()
+                .setReplicaId(followerId2)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
follower2.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1));
+        context.assertSentDescribeQuorumResponse(localId, epoch, -1L, 
expectedVoterStates, Collections.emptyList());
+
+        context.time.sleep(100);
+        long fetchOffset = withBootstrapSnapshot ? 3L : 1L;
+        long followerFetchTime1 = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset, 
epoch, 0));
+        context.pollUntilResponse();
+        long expectedHW = fetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+        List<String> records = Arrays.asList("foo", "bar");
+        long nextFetchOffset = fetchOffset + records.size();
+        context.client.scheduleAppend(epoch, records);
         context.client.poll();
 
         context.time.sleep(100);
-        long closeFollowerFetchTime = context.time.milliseconds();
-        context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, 
epoch, 0));
+        context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(3L, epoch);
+
+        expectedVoterStates.get(0)
+            .setLogEndOffset(nextFetchOffset)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(1)
+            .setLogEndOffset(fetchOffset)
+            .setLastFetchTimestamp(followerFetchTime1)
+            .setLastCaughtUpTimestamp(followerFetchTime1);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+
+        // After follower2 catches up to leader
+        context.time.sleep(100);
+        long followerFetchTime2 = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(epoch, follower2, 
nextFetchOffset, epoch, 0));
+        context.pollUntilResponse();
+        expectedHW = nextFetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+        context.time.sleep(100);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(2)
+            .setLogEndOffset(nextFetchOffset)
+            .setLastFetchTimestamp(followerFetchTime2)
+            .setLastCaughtUpTimestamp(followerFetchTime2);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+
+        // Describe quorum returns error if leader loses leadership
+        context.time.sleep(context.checkQuorumTimeoutMs);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+        
context.assertSentDescribeQuorumResponse(Errors.NOT_LEADER_OR_FOLLOWER, 0, 0, 
0, Collections.emptyList(), Collections.emptyList());
+    }
+
+    @ParameterizedTest
+    @CsvSource({ "true, true", "true, false", "false, false" })
+    public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean 
withBootstrapSnapshot) throws Exception {
+        int localId = randomReplicaId();
+        int followerId = localId + 1;
+        ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+        Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+        ReplicaKey bootstrapFollower = replicaKey(followerId, 
withBootstrapSnapshot);
+        Uuid followerDirectoryId = 
bootstrapFollower.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID);
+        ReplicaKey follower = ReplicaKey.of(followerId, followerDirectoryId);
+
+        RaftClientTestContext.Builder builder = new 
RaftClientTestContext.Builder(localId, localDirectoryId)
+            .withKip853Rpc(withKip853Rpc);
+
+        if (withBootstrapSnapshot) {
+            VoterSet bootstrapVoterSet = 
VoterSetTest.voterSet(Stream.of(local, bootstrapFollower));
+            builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+        } else {
+            VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local, 
follower));
+            builder.withStaticVoters(staticVoterSet);
+        }
+        RaftClientTestContext context = builder.build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Update HW to non-initial value
+        context.time.sleep(100);
+        long fetchOffset = withBootstrapSnapshot ? 3L : 1L;
+        long followerFetchTime = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(1, follower, fetchOffset, 
epoch, 0));
+        context.pollUntilResponse();
+        long expectedHW = fetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
 
         // Create observer
-        ReplicaKey observerId = replicaKey(localId + 3, withKip853Rpc);
+        ReplicaKey observer = replicaKey(localId + 2, withKip853Rpc);
+        Uuid observerDirectoryId = 
observer.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID);
         context.time.sleep(100);
         long observerFetchTime = context.time.milliseconds();
-        context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0, 
0));
+        context.deliverRequest(context.fetchRequest(epoch, observer, 0L, 0, 
0));
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(3L, epoch);
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+        context.time.sleep(100);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
localDirectoryId : ReplicaKey.NO_DIRECTORY_ID)
+                // As we are appending the records directly to the log,
+                // the leader end offset hasn't been updated yet.
+                .setLogEndOffset(fetchOffset)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower.id())
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
followerDirectoryId : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(fetchOffset)
+                .setLastFetchTimestamp(followerFetchTime)
+                .setLastCaughtUpTimestamp(followerFetchTime));
+        List<ReplicaState> expectedObserverStates = singletonList(
+            new ReplicaState()
+                .setReplicaId(observer.id())
+                .setReplicaDirectoryId(observerDirectoryId)
+                .setLogEndOffset(0L)
+                .setLastFetchTimestamp(observerFetchTime)
+                .setLastCaughtUpTimestamp(-1L));
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+
+        // Update observer fetch state
+        context.time.sleep(100);
+        observerFetchTime = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(epoch, observer, 
fetchOffset, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+        context.time.sleep(100);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedObserverStates.get(0)
+            .setLogEndOffset(fetchOffset)
+            .setLastFetchTimestamp(observerFetchTime)
+            .setLastCaughtUpTimestamp(observerFetchTime);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+
+        // Observer falls behind
+        context.time.sleep(100);
+        List<String> records = Arrays.asList("foo", "bar");
+        context.client.scheduleAppend(epoch, records);
+        context.client.poll();
+
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLogEndOffset(fetchOffset + records.size())
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+
+        // Observer is removed due to inactivity
+        long timeToSleep = LeaderState.OBSERVER_SESSION_TIMEOUT_MS;
+        while (timeToSleep > 0) {
+            // Follower needs to continue polling to keep leader alive
+            followerFetchTime = context.time.milliseconds();
+            context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset, epoch, 0));
+            context.pollUntilResponse();
+            context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+            context.time.sleep(context.checkQuorumTimeoutMs - 1);
+            timeToSleep = timeToSleep - (context.checkQuorumTimeoutMs - 1);
+        }
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(1)
+            .setLastFetchTimestamp(followerFetchTime);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+
+        // No-op for negative node id
+        context.deliverRequest(context.fetchRequest(epoch, ReplicaKey.of(-1, 
ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+    }
+
+    @ParameterizedTest
+    @CsvSource({ "true, true", "true, false", "false, false" })
+    public void testDescribeQuorumNonMonotonicFollowerFetch(boolean 
withKip853Rpc, boolean withBootstrapSnapshot) throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+        Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+        int followerId = localId + 1;
+        ReplicaKey bootstrapFollower = replicaKey(followerId, 
withBootstrapSnapshot);
+        Uuid followerDirectoryId = 
bootstrapFollower.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID);
+        ReplicaKey follower = ReplicaKey.of(followerId, followerDirectoryId);
+
+        RaftClientTestContext.Builder builder = new 
RaftClientTestContext.Builder(localId, localDirectoryId)
+            .withKip853Rpc(withKip853Rpc);
+        if (withBootstrapSnapshot) {
+            VoterSet bootstrapVoterSet = 
VoterSetTest.voterSet(Stream.of(local, bootstrapFollower));
+            builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+        } else {
+            VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local, 
follower));
+            builder.withStaticVoters(staticVoterSet);
+        }
+        RaftClientTestContext context = builder.build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Update HW to non-initial value
+        context.time.sleep(100);
+        List<String> batch = Arrays.asList("foo", "bar");
+        context.client.scheduleAppend(epoch, batch);
+        context.client.poll();
+        long fetchOffset = withBootstrapSnapshot ? 5L : 3L;
+        long followerFetchTime = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset, epoch, 0));
+        context.pollUntilResponse();
+        long expectedHW = fetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
 
         context.time.sleep(100);
         context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
local.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(fetchOffset)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower.id())
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
follower.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(fetchOffset)
+                .setLastFetchTimestamp(followerFetchTime)
+                .setLastCaughtUpTimestamp(followerFetchTime));
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+
+        // Follower crashes and disk is lost. It fetches an earlier offset to 
rebuild state.
+        // The leader will report an error in the logs, but will not let the 
high watermark rewind
+        context.time.sleep(100);
+        followerFetchTime = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset - 1, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.time.sleep(100);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(1)
+            .setLogEndOffset(fetchOffset - batch.size())
+            .setLastFetchTimestamp(followerFetchTime);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testStaticVotersIgnoredWithBootstrapSnapshot(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey follower = replicaKey(localId + 1, true);
+        ReplicaKey follower2 = replicaKey(localId + 2, true);
+        // only include one follower in static voter set
+        Set<Integer> staticVoters = Utils.mkSet(localId, follower.id());
+        VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower, 
follower2));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, local.directoryId().get())
+            .withStaticVoters(staticVoters)
+            .withKip853Rpc(withKip853Rpc)
+            .withBootstrapSnapshot(Optional.of(voterSet))
+            .build();
 
-        // KAFKA-16953 will add support for including the directory id
-        context.assertSentDescribeQuorumResponse(localId, epoch, 3L,
-            Arrays.asList(
-                new ReplicaState()
-                    .setReplicaId(localId)
-                    .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
-                    // As we are appending the records directly to the log,
-                    // the leader end offset hasn't been updated yet.
-                    .setLogEndOffset(3L)
-                    .setLastFetchTimestamp(context.time.milliseconds())
-                    .setLastCaughtUpTimestamp(context.time.milliseconds()),
-                new ReplicaState()
-                    .setReplicaId(laggingFollower.id())
-                    .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
-                    .setLogEndOffset(1L)
-                    .setLastFetchTimestamp(laggingFollowerFetchTime)
-                    .setLastCaughtUpTimestamp(laggingFollowerFetchTime),
-                new ReplicaState()
-                    .setReplicaId(closeFollower.id())
-                    .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
-                    .setLogEndOffset(3L)
-                    .setLastFetchTimestamp(closeFollowerFetchTime)
-                    .setLastCaughtUpTimestamp(closeFollowerFetchTime)),
-            singletonList(
-                new ReplicaState()
-                    .setReplicaId(observerId.id())
-                    .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
-                    .setLogEndOffset(0L)
-                    .setLastFetchTimestamp(observerFetchTime)
-                    .setLastCaughtUpTimestamp(-1L)));
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+        // check describe quorum response has both followers
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withKip853Rpc ? 
local.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(3L)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower.id())
+                .setReplicaDirectoryId(withKip853Rpc ? 
follower.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1),
+            new ReplicaState()
+                .setReplicaId(follower2.id())
+                .setReplicaDirectoryId(withKip853Rpc ? 
follower2.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1));
+        context.assertSentDescribeQuorumResponse(localId, epoch, -1L, 
expectedVoterStates, Collections.emptyList());
     }
 
+
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLeaderGracefulShutdownTimeout(boolean withKip853Rpc) 
throws Exception {
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index dfb5469434d..8ce18ec9d44 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.raft;
 
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.raft.internals.BatchAccumulator;
@@ -33,7 +32,6 @@ import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -183,164 +181,6 @@ public class LeaderStateTest {
         );
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testLastCaughtUpTimeVoters(boolean withDirectoryId) {
-        ReplicaKey nodeKey1 = replicaKey(1, withDirectoryId);
-        ReplicaKey nodeKey2 = replicaKey(2, withDirectoryId);
-        int currentTime = 1000;
-        int fetchTime = 0;
-        int caughtUpTime = -1;
-        VoterSet voters = localWithRemoteVoterSet(Stream.of(nodeKey1, 
nodeKey2), withDirectoryId);
-        LeaderState<?> state = newLeaderState(voters, 10L);
-
-        assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(10L), 
voters));
-        assertEquals(mkSet(nodeKey1, nodeKey2), 
state.nonAcknowledgingVoters());
-        assertEquals(Optional.empty(), state.highWatermark());
-
-        // Node 1 falls behind
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(11L), 
voters));
-        assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(10L)));
-        assertEquals(
-            currentTime,
-            describeVoterState(state, localReplicaKey.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertEquals(
-            caughtUpTime,
-            describeVoterState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-
-        // Node 1 catches up to leader
-        assertTrue(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(11L)));
-        caughtUpTime = fetchTime;
-        assertEquals(
-            currentTime,
-            describeVoterState(state, localReplicaKey.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertEquals(
-            caughtUpTime,
-            describeVoterState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-
-        // Node 1 falls behind
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(100L), 
voters));
-        assertTrue(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(50L)));
-        assertEquals(currentTime, describeVoterState(state, 
localReplicaKey.id(), currentTime).lastCaughtUpTimestamp());
-        assertEquals(
-            caughtUpTime,
-            describeVoterState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-
-        // Node 1 catches up to the last fetch offset
-        int prevFetchTime = fetchTime;
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(200L), 
voters));
-        assertTrue(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(100L)));
-        caughtUpTime = prevFetchTime;
-        assertEquals(
-            currentTime,
-            describeVoterState(state, localReplicaKey.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertEquals(
-            caughtUpTime,
-            describeVoterState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-
-        // Node2 has never caught up to leader
-        assertEquals(
-            -1L,
-            describeVoterState(state, nodeKey2.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(300L), 
voters));
-        assertTrue(state.updateReplicaState(nodeKey2, ++fetchTime, new 
LogOffsetMetadata(200L)));
-        assertEquals(
-            -1L,
-            describeVoterState(state, nodeKey2.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertTrue(state.updateReplicaState(nodeKey2, ++fetchTime, new 
LogOffsetMetadata(250L)));
-        assertEquals(
-            -1L,
-            describeVoterState(state, nodeKey2.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = { true, false })
-    public void testLastCaughtUpTimeObserver(boolean withDirectoryId) {
-        ReplicaKey nodeKey1 = replicaKey(1, withDirectoryId);
-        int currentTime = 1000;
-        int fetchTime = 0;
-        int caughtUpTime = -1;
-
-        VoterSet voters = VoterSetTest.voterSet(Stream.of(localReplicaKey));
-        LeaderState<?> state = newLeaderState(voters, 5L);
-
-        assertEquals(Optional.empty(), state.highWatermark());
-        assertEquals(emptySet(), state.nonAcknowledgingVoters());
-
-        // Node 1 falls behind
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(11L), voters));
-        assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(10L)));
-        assertEquals(
-            currentTime,
-            describeVoterState(state, localReplicaKey.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertEquals(
-            caughtUpTime,
-            describeObserverState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-
-        // Node 1 catches up to leader
-        assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(11L)));
-        caughtUpTime = fetchTime;
-        assertEquals(
-            currentTime,
-            describeVoterState(state, localReplicaKey.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertEquals(
-            caughtUpTime,
-            describeObserverState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-
-        // Node 1 falls behind
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(100L), 
voters));
-        assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(50L)));
-        assertEquals(
-            currentTime,
-            describeVoterState(state, localReplicaKey.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertEquals(
-            caughtUpTime,
-            describeObserverState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-
-        // Node 1 catches up to the last fetch offset
-        int prevFetchTime = fetchTime;
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(200L), 
voters));
-        assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(102L)));
-        caughtUpTime = prevFetchTime;
-        assertEquals(
-            currentTime,
-            describeVoterState(state, localReplicaKey.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertEquals(
-            caughtUpTime,
-            describeObserverState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-
-        // Node 1 catches up to leader
-        assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new 
LogOffsetMetadata(200L)));
-        caughtUpTime = fetchTime;
-        assertEquals(
-            currentTime,
-            describeVoterState(state, localReplicaKey.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-        assertEquals(
-            caughtUpTime,
-            describeObserverState(state, nodeKey1.id(), 
currentTime).lastCaughtUpTimestamp()
-        );
-    }
-
     @Test
     public void testIdempotentEndOffsetUpdate() {
         VoterSet voters = VoterSetTest.voterSet(Stream.of(localReplicaKey));
@@ -560,7 +400,6 @@ public class LeaderStateTest {
         // Follower crashes and disk is lost. It fetches an earlier offset to 
rebuild state.
         // The leader will report an error in the logs, but will not let the 
high watermark rewind
         assertFalse(state.updateReplicaState(nodeKey1, time.milliseconds(), 
new LogOffsetMetadata(5L)));
-        assertEquals(5L, describeVoterState(state, nodeKey1.id(), 
time.milliseconds()).logEndOffset());
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
     }
 
@@ -587,301 +426,6 @@ public class LeaderStateTest {
         );
     }
 
-    @Test
-    public void testDescribeQuorumWithSingleVoter() {
-        MockTime time = new MockTime();
-        long leaderStartOffset = 10L;
-        long leaderEndOffset = 15L;
-
-        VoterSet voters = VoterSetTest.voterSet(Stream.of(localReplicaKey));
-        LeaderState<?> state = newLeaderState(voters, leaderStartOffset);
-
-        // Until we have updated local state, high watermark should be 
uninitialized
-        assertEquals(Optional.empty(), state.highWatermark());
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
-        assertEquals(-1, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(epoch, partitionData.leaderEpoch());
-        assertEquals(Collections.emptyList(), partitionData.observers());
-        assertEquals(1, partitionData.currentVoters().size());
-        // KAFKA-16953 will add support for including the directory id
-        assertEquals(
-            new DescribeQuorumResponseData.ReplicaState()
-                .setReplicaId(localReplicaKey.id())
-                .setLogEndOffset(-1)
-                .setLastFetchTimestamp(time.milliseconds())
-                .setLastCaughtUpTimestamp(time.milliseconds()),
-            partitionData.currentVoters().get(0)
-        );
-
-
-        // Now update the high watermark and verify the describe output
-        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(leaderEndOffset), voters));
-        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), 
state.highWatermark());
-
-        time.sleep(500);
-
-        partitionData = state.describeQuorum(time.milliseconds());
-        assertEquals(leaderEndOffset, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(epoch, partitionData.leaderEpoch());
-        assertEquals(Collections.emptyList(), partitionData.observers());
-        assertEquals(1, partitionData.currentVoters().size());
-        // KAFKA-16953 will add support for including the directory id
-        assertEquals(
-            new DescribeQuorumResponseData.ReplicaState()
-                .setReplicaId(localReplicaKey.id())
-                .setLogEndOffset(leaderEndOffset)
-                .setLastFetchTimestamp(time.milliseconds())
-                .setLastCaughtUpTimestamp(time.milliseconds()),
-            partitionData.currentVoters().get(0)
-        );
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testDescribeQuorumWithMultipleVoters(boolean withDirectoryId) {
-        MockTime time = new MockTime();
-        ReplicaKey activeFollowerKey = replicaKey(1, withDirectoryId);
-        ReplicaKey inactiveFollowerKey = replicaKey(2, withDirectoryId);
-        long leaderStartOffset = 10L;
-        long leaderEndOffset = 15L;
-
-        VoterSet voters = localWithRemoteVoterSet(
-            Stream.of(activeFollowerKey, inactiveFollowerKey),
-            withDirectoryId
-        );
-        LeaderState<?> state = newLeaderState(voters, leaderStartOffset);
-
-        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(leaderEndOffset), voters));
-        assertEquals(Optional.empty(), state.highWatermark());
-
-        long activeFollowerFetchTimeMs = time.milliseconds();
-        assertTrue(
-            state.updateReplicaState(
-                activeFollowerKey,
-                activeFollowerFetchTimeMs,
-                new LogOffsetMetadata(leaderEndOffset)
-            )
-        );
-        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), 
state.highWatermark());
-
-        time.sleep(500);
-
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
-        assertEquals(leaderEndOffset, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(epoch, partitionData.leaderEpoch());
-        assertEquals(Collections.emptyList(), partitionData.observers());
-
-        List<DescribeQuorumResponseData.ReplicaState> voterStates = 
partitionData.currentVoters();
-        assertEquals(3, voterStates.size());
-
-        DescribeQuorumResponseData.ReplicaState leaderState = 
findReplicaOrFail(
-            localReplicaKey.id(),
-            partitionData.currentVoters()
-        );
-        // KAFKA-16953 will add support for including the directory id
-        assertEquals(
-            new DescribeQuorumResponseData.ReplicaState()
-                .setReplicaId(localReplicaKey.id())
-                .setLogEndOffset(leaderEndOffset)
-                .setLastFetchTimestamp(time.milliseconds())
-                .setLastCaughtUpTimestamp(time.milliseconds()),
-            leaderState
-        );
-
-        DescribeQuorumResponseData.ReplicaState activeFollowerState = 
findReplicaOrFail(
-            activeFollowerKey.id(),
-            partitionData.currentVoters()
-        );
-        assertEquals(
-            new DescribeQuorumResponseData.ReplicaState()
-                .setReplicaId(activeFollowerKey.id())
-                .setLogEndOffset(leaderEndOffset)
-                .setLastFetchTimestamp(activeFollowerFetchTimeMs)
-                .setLastCaughtUpTimestamp(activeFollowerFetchTimeMs),
-            activeFollowerState
-        );
-
-        DescribeQuorumResponseData.ReplicaState inactiveFollowerState = 
findReplicaOrFail(
-            inactiveFollowerKey.id(),
-            partitionData.currentVoters()
-        );
-        assertEquals(
-            new DescribeQuorumResponseData.ReplicaState()
-                .setReplicaId(inactiveFollowerKey.id())
-                .setLogEndOffset(-1)
-                .setLastFetchTimestamp(-1)
-                .setLastCaughtUpTimestamp(-1),
-            inactiveFollowerState
-        );
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testDescribeQuorumWithObservers(boolean withDirectoryId) {
-        MockTime time = new MockTime();
-
-        ReplicaKey observerKey = replicaKey(10, withDirectoryId);
-        long epochStartOffset = 10L;
-
-        VoterSet voters = localWithRemoteVoterSet(Stream.empty(), 
withDirectoryId);
-        LeaderState<?> state = newLeaderState(voters, epochStartOffset);
-        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), voters));
-        assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), 
state.highWatermark());
-
-        time.sleep(500);
-        long observerFetchTimeMs = time.milliseconds();
-        assertFalse(
-            state.updateReplicaState(
-                observerKey,
-                observerFetchTimeMs,
-                new LogOffsetMetadata(epochStartOffset + 1)
-            )
-        );
-
-        time.sleep(500);
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
-        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(epoch, partitionData.leaderEpoch());
-
-        assertEquals(1, partitionData.currentVoters().size());
-        assertEquals(localReplicaKey.id(), 
partitionData.currentVoters().get(0).replicaId());
-        // KAFKA-16953 will add support for including the directory id
-        assertEquals(
-            ReplicaKey.NO_DIRECTORY_ID,
-            partitionData.currentVoters().get(0).replicaDirectoryId()
-        );
-
-        List<DescribeQuorumResponseData.ReplicaState> observerStates = 
partitionData.observers();
-        assertEquals(1, observerStates.size());
-
-        DescribeQuorumResponseData.ReplicaState observerState = 
observerStates.get(0);
-        // KAFKA-16953 will add support for including the directory id
-        assertEquals(new DescribeQuorumResponseData.ReplicaState()
-                .setReplicaId(observerKey.id())
-                .setLogEndOffset(epochStartOffset + 1)
-                .setLastFetchTimestamp(observerFetchTimeMs)
-                .setLastCaughtUpTimestamp(observerFetchTimeMs),
-            observerState);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testDescribeQuorumWithVotersAndObservers(boolean 
withDirectoryId) {
-        MockTime time = new MockTime();
-        ReplicaKey nodeKey1 = replicaKey(1, withDirectoryId);
-        ReplicaKey nodeKey2 = replicaKey(2, withDirectoryId);
-        long epochStartOffset = 10L;
-
-        VoterSet voters = localWithRemoteVoterSet(Stream.of(nodeKey1, 
nodeKey2), withDirectoryId);
-        LeaderState<?> state = newLeaderState(voters, epochStartOffset);
-
-
-        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), voters));
-        assertTrue(state.updateReplicaState(nodeKey2, 0, new 
LogOffsetMetadata(epochStartOffset + 1)));
-        assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), 
state.highWatermark());
-
-        // node1 becomes an observer
-        long fetchTimeMs = time.milliseconds();
-        assertFalse(state.updateReplicaState(nodeKey1, fetchTimeMs, new 
LogOffsetMetadata(epochStartOffset + 1)));
-        VoterSet votersWithoutNode1 = voters.removeVoter(nodeKey1).get();
-        state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5), 
votersWithoutNode1);
-
-        time.sleep(500);
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
-        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(epoch, partitionData.leaderEpoch());
-        DescribeQuorumResponseData.ReplicaState observer = 
partitionData.observers().get(0);
-        assertEquals(nodeKey1.id(), observer.replicaId());
-        // KAFKA-16953 will add support for including the directory id
-        assertEquals(
-            ReplicaKey.NO_DIRECTORY_ID,
-            observer.replicaDirectoryId()
-        );
-        assertEquals(epochStartOffset + 1, observer.logEndOffset());
-        assertEquals(2, partitionData.currentVoters().size());
-
-        // node1 catches up with leader, HW should not change
-        time.sleep(500);
-        fetchTimeMs = time.milliseconds();
-        assertFalse(state.updateReplicaState(nodeKey1, fetchTimeMs, new 
LogOffsetMetadata(epochStartOffset + 5)));
-        assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), 
state.highWatermark());
-
-        // node1 becomes a voter again, HW should change
-        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 10), voters));
-
-        time.sleep(500);
-        partitionData = state.describeQuorum(time.milliseconds());
-        assertEquals(epochStartOffset + 5, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(epoch, partitionData.leaderEpoch());
-        assertEquals(Collections.emptyList(), partitionData.observers());
-        assertEquals(3, partitionData.currentVoters().size());
-        DescribeQuorumResponseData.ReplicaState node1State = partitionData
-            .currentVoters()
-            .stream()
-            .filter(replicaState -> replicaState.replicaId() == nodeKey1.id())
-            .findFirst()
-            .get();
-        assertEquals(epochStartOffset + 5, node1State.logEndOffset());
-        assertEquals(fetchTimeMs, node1State.lastFetchTimestamp());
-    }
-
-    @Test
-    public void testClearInactiveObserversIgnoresLeader() {
-        MockTime time = new MockTime();
-        ReplicaKey followerKey = ReplicaKey.of(1, Uuid.randomUuid());
-        ReplicaKey observerKey = ReplicaKey.of(10, Uuid.randomUuid());
-        long epochStartOffset = 10L;
-
-        VoterSet voters = localWithRemoteVoterSet(Stream.of(followerKey), 
true);
-        LeaderState<?> state = newLeaderState(voters, epochStartOffset);
-
-        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), voters));
-        assertTrue(state.updateReplicaState(followerKey, time.milliseconds(), 
new LogOffsetMetadata(epochStartOffset + 1)));
-
-        // observer is returned since its lastFetchTimestamp is within 
OBSERVER_SESSION_TIMEOUT_MS
-        time.sleep(500);
-        long observerFetchTimeMs = time.milliseconds();
-        assertFalse(state.updateReplicaState(observerKey, observerFetchTimeMs, 
new LogOffsetMetadata(epochStartOffset + 1)));
-
-        time.sleep(500);
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
-        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(2, partitionData.currentVoters().size());
-        assertEquals(1, partitionData.observers().size());
-        assertEquals(observerKey.id(), 
partitionData.observers().get(0).replicaId());
-
-        // observer is not returned once its lastFetchTimestamp surpasses 
OBSERVER_SESSION_TIMEOUT_MS
-        time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
-        partitionData = state.describeQuorum(time.milliseconds());
-        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(2, partitionData.currentVoters().size());
-        assertEquals(0, partitionData.observers().size());
-
-        // leader becomes observer
-        VoterSet votersWithoutLeader = 
voters.removeVoter(localReplicaKey).get();
-        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 10), votersWithoutLeader));
-
-        // leader should be returned in describe quorum output
-        time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
-        long describeQuorumCalledTime = time.milliseconds();
-        partitionData = state.describeQuorum(describeQuorumCalledTime);
-        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
-        assertEquals(localReplicaKey.id(), partitionData.leaderId());
-        assertEquals(1, partitionData.currentVoters().size());
-        assertEquals(1, partitionData.observers().size());
-        DescribeQuorumResponseData.ReplicaState observer = 
partitionData.observers().get(0);
-        assertEquals(localReplicaKey.id(), observer.replicaId());
-        assertEquals(describeQuorumCalledTime, observer.lastFetchTimestamp());
-    }
-
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testCheckQuorum(boolean withDirectoryId) {
@@ -1026,63 +570,6 @@ public class LeaderStateTest {
         assertEquals(Optional.of(new LogOffsetMetadata(13L)), 
state.highWatermark());
     }
 
-    @Test
-    public void testNoOpForNegativeRemoteNodeId() {
-        MockTime time = new MockTime();
-        int replicaId = -1;
-        long epochStartOffset = 10L;
-
-        LeaderState<?> state = newLeaderState(
-            VoterSetTest.voterSet(Stream.of(localReplicaKey)),
-            epochStartOffset
-        );
-        assertFalse(
-            state.updateReplicaState(
-                ReplicaKey.of(replicaId, ReplicaKey.NO_DIRECTORY_ID),
-                0,
-                new LogOffsetMetadata(epochStartOffset)
-            )
-        );
-
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
-        List<DescribeQuorumResponseData.ReplicaState> observerStates = 
partitionData.observers();
-        assertEquals(Collections.emptyList(), observerStates);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testObserverStateExpiration(boolean withDirectoryId) {
-        MockTime time = new MockTime();
-        ReplicaKey observerKey = replicaKey(10, withDirectoryId);
-        long epochStartOffset = 10L;
-
-        LeaderState<?> state = newLeaderState(
-            VoterSetTest.voterSet(Stream.of(localReplicaKey)),
-            epochStartOffset
-        );
-
-        state.updateReplicaState(
-            observerKey,
-            time.milliseconds(),
-            new LogOffsetMetadata(epochStartOffset)
-        );
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
-        List<DescribeQuorumResponseData.ReplicaState> observerStates = 
partitionData.observers();
-        assertEquals(1, observerStates.size());
-
-        DescribeQuorumResponseData.ReplicaState observerState = 
observerStates.get(0);
-        assertEquals(observerKey.id(), observerState.replicaId());
-        // KAFKA-16953 will add support for including the directory id
-        assertEquals(
-            ReplicaKey.NO_DIRECTORY_ID,
-            observerState.replicaDirectoryId()
-        );
-
-        time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
-        partitionData = state.describeQuorum(time.milliseconds());
-        assertEquals(Collections.emptyList(), partitionData.observers());
-    }
-
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testGrantVote(boolean isLogUpToDate) {
@@ -1147,36 +634,6 @@ public class LeaderStateTest {
         }
     }
 
-    private DescribeQuorumResponseData.ReplicaState describeVoterState(
-        LeaderState<?> state,
-        int voterId,
-        long currentTimeMs
-    ) {
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(currentTimeMs);
-        return findReplicaOrFail(voterId, partitionData.currentVoters());
-    }
-
-    private DescribeQuorumResponseData.ReplicaState describeObserverState(
-        LeaderState<?> state,
-        int observerId,
-        long currentTimeMs
-    ) {
-        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(currentTimeMs);
-        return findReplicaOrFail(observerId, partitionData.observers());
-    }
-
-    private DescribeQuorumResponseData.ReplicaState findReplicaOrFail(
-        int replicaId,
-        List<DescribeQuorumResponseData.ReplicaState> replicas
-    ) {
-        return replicas.stream()
-            .filter(observer -> observer.replicaId() == replicaId)
-            .findFirst()
-            .orElseThrow(() -> new AssertionError(
-                "Failed to find expected replica state for replica " + 
replicaId
-            ));
-    }
-
     private ReplicaKey replicaKey(int id, boolean withDirectoryId) {
         Uuid directoryId = withDirectoryId ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID;
         return ReplicaKey.of(id, directoryId);
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index cab37a3241e..695af19f7b1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -650,19 +650,41 @@ public final class RaftClientTestContext {
         long highWatermark,
         List<ReplicaState> voterStates,
         List<ReplicaState> observerStates
+    ) {
+        assertSentDescribeQuorumResponse(Errors.NONE, leaderId, leaderEpoch, 
highWatermark, voterStates, observerStates);
+    }
+
+    void assertSentDescribeQuorumResponse(
+        Errors error,
+        int leaderId,
+        int leaderEpoch,
+        long highWatermark,
+        List<ReplicaState> voterStates,
+        List<ReplicaState> observerStates
     ) {
         DescribeQuorumResponseData response = collectDescribeQuorumResponse();
 
         DescribeQuorumResponseData.PartitionData partitionData = new 
DescribeQuorumResponseData.PartitionData()
-            .setErrorCode(Errors.NONE.code())
+            .setErrorCode(error.code())
             .setLeaderId(leaderId)
             .setLeaderEpoch(leaderEpoch)
             .setHighWatermark(highWatermark)
             .setCurrentVoters(voterStates)
             .setObservers(observerStates);
 
-        // KAFKA-16953 will add support for including the node listeners in 
the node collection
-        DescribeQuorumResponseData.NodeCollection nodes = new 
DescribeQuorumResponseData.NodeCollection();
+        if (!error.equals(Errors.NONE)) {
+            partitionData.setErrorMessage(error.message());
+        }
+
+        DescribeQuorumResponseData.NodeCollection nodes = new 
DescribeQuorumResponseData.NodeCollection(0);
+        if (describeQuorumRpcVersion() >= 2) {
+            nodes = new 
DescribeQuorumResponseData.NodeCollection(voterStates.size());
+            for (ReplicaState voterState : voterStates) {
+                nodes.add(new DescribeQuorumResponseData.Node()
+                    .setNodeId(voterState.replicaId())
+                    
.setListeners(startingVoters.listeners(voterState.replicaId()).toDescribeQuorumResponseListeners()));
+            }
+        }
 
         DescribeQuorumResponseData expectedResponse = 
DescribeQuorumResponse.singletonResponse(
             metadataPartition,
@@ -680,6 +702,7 @@ public final class RaftClientTestContext {
             .sorted(Comparator.comparingInt(ReplicaState::replicaId))
             .collect(Collectors.toList());
         
response.topics().get(0).partitions().get(0).setCurrentVoters(sortedVoters);
+        
response.nodes().sort(Comparator.comparingInt(DescribeQuorumResponseData.Node::nodeId));
 
         assertEquals(expectedResponse, response);
     }

Reply via email to