This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aecaf444756 KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)
aecaf444756 is described below

commit aecaf4447561edd8da9f06e3abdf46f382dc9d89
Author: Nikolay <[email protected]>
AuthorDate: Tue Jun 11 20:01:35 2024 +0300

    KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)
    
    Add support for KIP-953 KRaft Quorum reconfiguration in the DescribeQuorum 
request and response.
    Also add support to AdminClient.describeQuorum, so that users will be able 
to find the current set of
    quorum nodes, as well as their directories, via these RPCs.
    
    Reviewers: Luke Chen <[email protected]>, Colin P. McCabe 
<[email protected]>, Andrew Schofield <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  20 +-
 .../org/apache/kafka/clients/admin/QuorumInfo.java |  76 ++++++-
 .../common/requests/DescribeQuorumRequest.java     |   4 +-
 .../common/requests/DescribeQuorumResponse.java    |  13 +-
 .../common/message/DescribeQuorumRequest.json      |   4 +-
 .../common/message/DescribeQuorumResponse.json     |  24 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  23 +-
 .../unit/kafka/server/ApiVersionsRequestTest.scala |   4 +-
 .../kafka/server/DescribeQuorumRequestTest.scala   |   2 +
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  10 +-
 .../apache/kafka/raft/KafkaRaftClientDriver.java   |   1 +
 .../java/org/apache/kafka/raft/LeaderState.java    |  84 +++++--
 .../java/org/apache/kafka/raft/QuorumState.java    |   3 +-
 .../java/org/apache/kafka/raft/RaftMessage.java    |   3 +-
 .../java/org/apache/kafka/raft/RaftRequest.java    |  17 +-
 .../java/org/apache/kafka/raft/RaftResponse.java   |   5 +
 .../kafka/raft/internals/BlockingMessageQueue.java |   5 +
 .../org/apache/kafka/raft/internals/VoterSet.java  |   6 +-
 .../apache/kafka/raft/KafkaNetworkChannelTest.java |   1 +
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |   7 +
 .../org/apache/kafka/raft/LeaderStateTest.java     | 243 ++++++++++++---------
 .../apache/kafka/raft/RaftClientTestContext.java   |  19 +-
 .../apache/kafka/raft/RaftEventSimulationTest.java |   2 +-
 .../kafka/raft/internals/KafkaRaftMetricsTest.java |   4 +-
 25 files changed, 418 insertions(+), 165 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c59cccf67c4..d7d525e4431 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4414,12 +4414,13 @@ public class KafkaAdminClient extends AdminClient {
             private QuorumInfo.ReplicaState 
translateReplicaState(DescribeQuorumResponseData.ReplicaState replica) {
                 return new QuorumInfo.ReplicaState(
                         replica.replicaId(),
+                        replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID 
: replica.replicaDirectoryId(),
                         replica.logEndOffset(),
                         replica.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()),
                         replica.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp()));
             }
 
-            private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+            private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition, 
DescribeQuorumResponseData.NodeCollection nodeCollection) {
                 List<QuorumInfo.ReplicaState> voters = 
partition.currentVoters().stream()
                     .map(this::translateReplicaState)
                     .collect(Collectors.toList());
@@ -4428,12 +4429,21 @@ public class KafkaAdminClient extends AdminClient {
                     .map(this::translateReplicaState)
                     .collect(Collectors.toList());
 
+                Map<Integer, QuorumInfo.Node> nodes = 
nodeCollection.stream().map(n -> {
+                    List<RaftVoterEndpoint> endpoints = n.listeners().stream()
+                        .map(l -> new RaftVoterEndpoint(l.name(), l.host(), 
l.port()))
+                        .collect(Collectors.toList());
+
+                    return new QuorumInfo.Node(n.nodeId(), endpoints);
+                }).collect(Collectors.toMap(QuorumInfo.Node::nodeId, 
Function.identity()));
+
                 return new QuorumInfo(
                     partition.leaderId(),
                     partition.leaderEpoch(),
                     partition.highWatermark(),
                     voters,
-                    observers
+                    observers,
+                    nodes
                 );
             }
 
@@ -4447,7 +4457,7 @@ public class KafkaAdminClient extends AdminClient {
             void handleResponse(AbstractResponse response) {
                 final DescribeQuorumResponse quorumResponse = 
(DescribeQuorumResponse) response;
                 if (quorumResponse.data().errorCode() != Errors.NONE.code()) {
-                    throw 
Errors.forCode(quorumResponse.data().errorCode()).exception();
+                    throw 
Errors.forCode(quorumResponse.data().errorCode()).exception(quorumResponse.data().errorMessage());
                 }
                 if (quorumResponse.data().topics().size() != 1) {
                     String msg = String.format("DescribeMetadataQuorum 
received %d topics when 1 was expected",
@@ -4476,9 +4486,9 @@ public class KafkaAdminClient extends AdminClient {
                     throw new UnknownServerException(msg);
                 }
                 if (partition.errorCode() != Errors.NONE.code()) {
-                    throw Errors.forCode(partition.errorCode()).exception();
+                    throw 
Errors.forCode(partition.errorCode()).exception(partition.errorMessage());
                 }
-                future.complete(createQuorumResult(partition));
+                future.complete(createQuorumResult(partition, 
quorumResponse.data().nodes()));
             }
 
             @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
index f9e4f8c11c9..5264b6f6aae 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java
@@ -16,7 +16,10 @@
  */
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.Uuid;
+
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalLong;
 
@@ -29,19 +32,22 @@ public class QuorumInfo {
     private final long highWatermark;
     private final List<ReplicaState> voters;
     private final List<ReplicaState> observers;
+    private final Map<Integer, Node> nodes;
 
     QuorumInfo(
         int leaderId,
         long leaderEpoch,
         long highWatermark,
         List<ReplicaState> voters,
-        List<ReplicaState> observers
+        List<ReplicaState> observers,
+        Map<Integer, Node> nodes
     ) {
         this.leaderId = leaderId;
         this.leaderEpoch = leaderEpoch;
         this.highWatermark = highWatermark;
         this.voters = voters;
         this.observers = observers;
+        this.nodes = nodes;
     }
 
     public int leaderId() {
@@ -64,6 +70,13 @@ public class QuorumInfo {
         return observers;
     }
 
+    /**
+     * @return The voter nodes in the Raft cluster, or an empty map if KIP-853 
is not enabled.
+     */
+    public Map<Integer, Node> nodes() {
+        return nodes;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -73,12 +86,13 @@ public class QuorumInfo {
             && leaderEpoch == that.leaderEpoch
             && highWatermark == that.highWatermark
             && Objects.equals(voters, that.voters)
-            && Objects.equals(observers, that.observers);
+            && Objects.equals(observers, that.observers)
+            && Objects.equals(nodes, that.nodes);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(leaderId, leaderEpoch, highWatermark, voters, 
observers);
+        return Objects.hash(leaderId, leaderEpoch, highWatermark, voters, 
observers, nodes);
     }
 
     @Override
@@ -89,26 +103,30 @@ public class QuorumInfo {
             ", highWatermark=" + highWatermark +
             ", voters=" + voters +
             ", observers=" + observers +
+            ", nodes=" + nodes +
             ')';
     }
 
     public static class ReplicaState {
         private final int replicaId;
+        private final Uuid replicaDirectoryId;
         private final long logEndOffset;
         private final OptionalLong lastFetchTimestamp;
         private final OptionalLong lastCaughtUpTimestamp;
 
         ReplicaState() {
-            this(0, 0, OptionalLong.empty(), OptionalLong.empty());
+            this(0, Uuid.ZERO_UUID, 0, OptionalLong.empty(), 
OptionalLong.empty());
         }
 
         ReplicaState(
             int replicaId,
+            Uuid replicaDirectoryId,
             long logEndOffset,
             OptionalLong lastFetchTimestamp,
             OptionalLong lastCaughtUpTimestamp
         ) {
             this.replicaId = replicaId;
+            this.replicaDirectoryId = replicaDirectoryId;
             this.logEndOffset = logEndOffset;
             this.lastFetchTimestamp = lastFetchTimestamp;
             this.lastCaughtUpTimestamp = lastCaughtUpTimestamp;
@@ -122,6 +140,13 @@ public class QuorumInfo {
             return replicaId;
         }
 
+        /**
+         * Return the directory id of the replica if configured, or 
Uuid.ZERO_UUID if not.
+         */
+        public Uuid replicaDirectoryId() {
+            return replicaDirectoryId;
+        }
+
         /**
          * Return the logEndOffset known by the leader for this replica.
          * @return The logEndOffset for this replica
@@ -154,6 +179,7 @@ public class QuorumInfo {
             if (o == null || getClass() != o.getClass()) return false;
             ReplicaState that = (ReplicaState) o;
             return replicaId == that.replicaId
+                && Objects.equals(replicaDirectoryId, that.replicaDirectoryId)
                 && logEndOffset == that.logEndOffset
                 && lastFetchTimestamp.equals(that.lastFetchTimestamp)
                 && lastCaughtUpTimestamp.equals(that.lastCaughtUpTimestamp);
@@ -161,17 +187,57 @@ public class QuorumInfo {
 
         @Override
         public int hashCode() {
-            return Objects.hash(replicaId, logEndOffset, lastFetchTimestamp, 
lastCaughtUpTimestamp);
+            return Objects.hash(replicaId, replicaDirectoryId, logEndOffset, 
lastFetchTimestamp, lastCaughtUpTimestamp);
         }
 
         @Override
         public String toString() {
             return "ReplicaState(" +
                 "replicaId=" + replicaId +
+                ", replicaDirectoryId=" + replicaDirectoryId +
                 ", logEndOffset=" + logEndOffset +
                 ", lastFetchTimestamp=" + lastFetchTimestamp +
                 ", lastCaughtUpTimestamp=" + lastCaughtUpTimestamp +
                 ')';
         }
     }
+
+    public static class Node {
+        private final int nodeId;
+        private final List<RaftVoterEndpoint> endpoints;
+
+        Node(int nodeId, List<RaftVoterEndpoint> endpoints) {
+            this.nodeId = nodeId;
+            this.endpoints = endpoints;
+        }
+
+        public int nodeId() {
+            return nodeId;
+        }
+
+        public List<RaftVoterEndpoint> endpoints() {
+            return endpoints;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Node node = (Node) o;
+            return nodeId == node.nodeId && Objects.equals(endpoints, 
node.endpoints);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(nodeId, endpoints);
+        }
+
+        @Override
+        public String toString() {
+            return "Node{" +
+                "nodeId=" + nodeId +
+                ", endpoints=" + endpoints +
+                '}';
+        }
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
index acdb11c6644..5de7da403d4 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
@@ -83,6 +83,7 @@ public class DescribeQuorumRequest extends AbstractRequest {
 
     public static DescribeQuorumResponseData 
getPartitionLevelErrorResponse(DescribeQuorumRequestData data, Errors error) {
         short errorCode = error.code();
+        String errorMessage = error.message();
 
         List<DescribeQuorumResponseData.TopicData> topicResponses = new 
ArrayList<>();
         for (DescribeQuorumRequestData.TopicData topic : data.topics()) {
@@ -93,6 +94,7 @@ public class DescribeQuorumRequest extends AbstractRequest {
                         requestPartition -> new 
DescribeQuorumResponseData.PartitionData()
                                                 
.setPartitionIndex(requestPartition.partitionIndex())
                                                 .setErrorCode(errorCode)
+                                                .setErrorMessage(errorMessage)
                     ).collect(Collectors.toList())));
         }
 
@@ -100,6 +102,6 @@ public class DescribeQuorumRequest extends AbstractRequest {
     }
 
     public static DescribeQuorumResponseData getTopLevelErrorResponse(Errors 
topLevelError) {
-        return new 
DescribeQuorumResponseData().setErrorCode(topLevelError.code());
+        return new 
DescribeQuorumResponseData().setErrorCode(topLevelError.code()).setErrorMessage(topLevelError.message());
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index 39e050c9405..5ad51f4e1de 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -84,19 +84,26 @@ public class DescribeQuorumResponse extends 
AbstractResponse {
                 .setTopicName(topicPartition.topic())
                 .setPartitions(Collections.singletonList(new 
DescribeQuorumResponseData.PartitionData()
                     .setPartitionIndex(topicPartition.partition())
-                    .setErrorCode(error.code())))));
+                    .setErrorCode(error.code())
+                    .setErrorMessage(error.message())))));
     }
 
 
     public static DescribeQuorumResponseData singletonResponse(
         TopicPartition topicPartition,
-        DescribeQuorumResponseData.PartitionData partitionData
+        DescribeQuorumResponseData.PartitionData partitionData,
+        DescribeQuorumResponseData.NodeCollection nodes
     ) {
-        return new DescribeQuorumResponseData()
+        DescribeQuorumResponseData res = new DescribeQuorumResponseData()
             .setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
                 .setTopicName(topicPartition.topic())
                 .setPartitions(Collections.singletonList(partitionData
                     .setPartitionIndex(topicPartition.partition())))));
+
+        if (nodes != null)
+            res.setNodes(nodes);
+
+        return res;
     }
 
     public static DescribeQuorumResponse parse(ByteBuffer buffer, short 
version) {
diff --git 
a/clients/src/main/resources/common/message/DescribeQuorumRequest.json 
b/clients/src/main/resources/common/message/DescribeQuorumRequest.json
index cee8fe69822..2faaefec736 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumRequest.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumRequest.json
@@ -19,8 +19,10 @@
   "listeners": ["broker", "controller"],
   "name": "DescribeQuorumRequest",
   // Version 1 adds additional fields in the response. The request is 
unchanged (KIP-836).
-  "validVersions": "0-1",
+  // Version 2 adds additional fields in the response. The request is 
unchanged (KIP-853).
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
+  "latestVersionUnstable": true, // Version 2 is still under development.
   "fields": [
     { "name": "Topics", "type": "[]TopicData",
       "versions": "0+", "fields": [
diff --git 
a/clients/src/main/resources/common/message/DescribeQuorumResponse.json 
b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
index 0ea6271238b..e0be61781f5 100644
--- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json
+++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json
@@ -18,11 +18,14 @@
   "type": "response",
   "name": "DescribeQuorumResponse",
   // Version 1 adds LastFetchTimeStamp and LastCaughtUpTimestamp in 
ReplicaState (KIP-836).
-  "validVersions": "0-1",
+  // Version 2 adds ErrorMessage, Nodes, ErrorMessage in ParitionData, 
ReplicaDirectoryId in ReplicaState (KIP-853).
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The top level error code."},
+    { "name": "ErrorMessage", "type": "string", "versions": "2+", 
"nullableVersions": "2+", "ignorable": true,
+      "about": "The error message, or null if there was no error." },
     { "name": "Topics", "type": "[]TopicData",
       "versions": "0+", "fields": [
       { "name": "TopicName", "type": "string", "versions": "0+", "entityType": 
"topicName",
@@ -32,6 +35,8 @@
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index." },
         { "name": "ErrorCode", "type": "int16", "versions": "0+"},
+        { "name": "ErrorMessage", "type": "string", "versions": "2+", 
"nullableVersions": "2+", "ignorable": true,
+          "about": "The error message, or null if there was no error." },
         { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
           "about": "The ID of the current leader or -1 if the leader is 
unknown."},
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
@@ -40,10 +45,25 @@
         { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" 
},
         { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
       ]}
-    ]}],
+    ]},
+    { "name": "Nodes", "type": "[]Node", "versions": "2+", "fields": [
+      { "name": "NodeId", "type": "int32", "versions": "2+",
+        "mapKey": true, "entityType": "brokerId", "about": "The ID of the 
associated node" },
+      { "name": "Listeners", "type": "[]Listener",
+        "about": "The listeners of this controller", "versions": "2+", 
"fields": [
+        { "name": "Name", "type": "string", "versions": "2+", "mapKey": true,
+          "about": "The name of the endpoint" },
+        { "name": "Host", "type": "string", "versions": "2+",
+          "about": "The hostname" },
+        { "name": "Port", "type": "uint16", "versions": "2+",
+          "about": "The port" }
+      ]}
+    ]}
+  ],
   "commonStructs": [
     { "name": "ReplicaState", "versions": "0+", "fields": [
       { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": 
"brokerId" },
+      { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "2+" },
       { "name": "LogEndOffset", "type": "int64", "versions": "0+",
         "about": "The last known log end offset of the follower or -1 if it is 
unknown"},
       { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", 
"ignorable": true, "default": -1,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index bd7c7d0b1ab..f7b27dcb868 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -269,6 +269,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
 import static 
org.apache.kafka.clients.admin.KafkaAdminClient.DEFAULT_LEAVE_GROUP_REASON;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignablePartitionResponse;
 import static 
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.ReassignableTopicResponse;
@@ -294,6 +295,7 @@ public class KafkaAdminClientTest {
     private static final Logger log = 
LoggerFactory.getLogger(KafkaAdminClientTest.class);
     private static final String GROUP_ID = "group-0";
     private static final int THROTTLE = 10;
+    public static final Uuid REPLICA_DIRECTORY_ID = Uuid.randomUuid();
 
     @Test
     public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
@@ -732,12 +734,17 @@ public class KafkaAdminClientTest {
 
     private static QuorumInfo defaultQuorumInfo(boolean emptyOptionals) {
         return new QuorumInfo(1, 1, 1L,
-                singletonList(new QuorumInfo.ReplicaState(1, 100,
+                singletonList(new QuorumInfo.ReplicaState(1,
+                        emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID,
+                        100,
                         emptyOptionals ? OptionalLong.empty() : 
OptionalLong.of(1000),
                         emptyOptionals ? OptionalLong.empty() : 
OptionalLong.of(1000))),
-                singletonList(new QuorumInfo.ReplicaState(1, 100,
+                singletonList(new QuorumInfo.ReplicaState(1,
+                        emptyOptionals ? Uuid.ZERO_UUID : REPLICA_DIRECTORY_ID,
+                        100,
                         emptyOptionals ? OptionalLong.empty() : 
OptionalLong.of(1000),
-                        emptyOptionals ? OptionalLong.empty() : 
OptionalLong.of(1000))));
+                        emptyOptionals ? OptionalLong.empty() : 
OptionalLong.of(1000))),
+                singletonMap(1, new QuorumInfo.Node(1, 
Collections.emptyList())));
     }
 
     private static DescribeQuorumResponse prepareDescribeQuorumResponse(
@@ -755,6 +762,7 @@ public class KafkaAdminClientTest {
         for (int i = 0; i < (partitionCountError ? 2 : 1); i++) {
             DescribeQuorumResponseData.ReplicaState replica = new 
DescribeQuorumResponseData.ReplicaState()
                     .setReplicaId(1)
+                    .setReplicaDirectoryId(emptyOptionals ? Uuid.ZERO_UUID : 
REPLICA_DIRECTORY_ID)
                     .setLogEndOffset(100);
             replica.setLastFetchTimestamp(emptyOptionals ? -1 : 1000);
             replica.setLastCaughtUpTimestamp(emptyOptionals ? -1 : 1000);
@@ -764,12 +772,17 @@ public class KafkaAdminClientTest {
                     .setHighWatermark(1)
                     .setCurrentVoters(singletonList(replica))
                     .setObservers(singletonList(replica))
-                    .setErrorCode(partitionLevelError.code()));
+                    .setErrorCode(partitionLevelError.code())
+                    .setErrorMessage(partitionLevelError.message()));
         }
         for (int i = 0; i < (topicCountError ? 2 : 1); i++) {
             topics.add(new 
DescribeQuorumResponseData.TopicData().setTopicName(topicName).setPartitions(partitions));
         }
-        return new DescribeQuorumResponse(new 
DescribeQuorumResponseData().setTopics(topics).setErrorCode(topLevelError.code()));
+        return new DescribeQuorumResponse(new DescribeQuorumResponseData()
+            .setTopics(topics)
+            .setErrorCode(topLevelError.code())
+            .setErrorMessage(topLevelError.message())
+            .setNodes(new 
DescribeQuorumResponseData.NodeCollection(Collections.singleton(new 
DescribeQuorumResponseData.Node().setNodeId(1)).iterator())));
     }
 
     /**
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index 98a66eccd5a..b71ded0cb2a 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -115,7 +115,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
   def testApiVersionsRequestThroughControllerListener(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
     val apiVersionsResponse = sendApiVersionsRequest(request, 
cluster.controllerListenerName.get())
-    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controllerListenerName.get())
+    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controllerListenerName.get(), enableUnstableLastVersion = true)
   }
 
   @ClusterTemplate("zkApiVersionsRequest")
@@ -153,7 +153,7 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
   def testApiVersionsRequestValidationV0ThroughControllerListener(): Unit = {
     val apiVersionsRequest = new 
ApiVersionsRequest.Builder().build(0.asInstanceOf[Short])
     val apiVersionsResponse = sendApiVersionsRequest(apiVersionsRequest, 
cluster.controllerListenerName.get())
-    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controllerListenerName.get(), apiVersion = 0)
+    validateApiVersionsResponse(apiVersionsResponse, 
cluster.controllerListenerName.get(), apiVersion = 0, enableUnstableLastVersion 
= true)
   }
 
   @ClusterTemplate("zkApiVersionsRequest")
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
index e8d94e3bb65..622dcf366a1 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala
@@ -60,6 +60,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
       val response = connectAndReceive[DescribeQuorumResponse](request)
 
       assertEquals(Errors.NONE, Errors.forCode(response.data.errorCode))
+      assertEquals("", response.data.errorMessage)
       assertEquals(1, response.data.topics.size)
 
       val topicData = response.data.topics.get(0)
@@ -69,6 +70,7 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) {
       val partitionData = topicData.partitions.get(0)
       assertEquals(KafkaRaftServer.MetadataPartition.partition, 
partitionData.partitionIndex)
       assertEquals(Errors.NONE, Errors.forCode(partitionData.errorCode))
+      assertEquals("", partitionData.errorMessage())
       assertTrue(partitionData.leaderEpoch > 0)
 
       val leaderId = partitionData.leaderId
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 208f9d59e1b..041d04c21f6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -615,7 +615,7 @@ class KafkaApisTest extends Logging {
   def testDescribeQuorumNotAllowedForZkClusters(): Unit = {
     val requestData = 
DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
     val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
-    val request = buildRequest(requestBuilder.build())
+    val request = 
buildRequest(requestBuilder.build(DescribeQuorumRequestData.HIGHEST_SUPPORTED_VERSION))
 
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)
@@ -624,6 +624,7 @@ class KafkaApisTest extends Logging {
 
     val response = verifyNoThrottling[DescribeQuorumResponse](request)
     assertEquals(Errors.UNKNOWN_SERVER_ERROR, 
Errors.forCode(response.data.errorCode))
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR.message(), 
response.data.errorMessage)
   }
 
   @Test
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index aa07ce89203..eb29f7321ff 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -314,7 +314,7 @@ final public class KafkaRaftClient<T> implements 
RaftClient<T> {
     ) {
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
-        if (state.updateLocalState(endOffsetMetadata, 
partitionState.lastVoterSet().voterIds())) {
+        if (state.updateLocalState(endOffsetMetadata, 
partitionState.lastVoterSet().voters())) {
             onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
@@ -1156,7 +1156,7 @@ final public class KafkaRaftClient<T> implements 
RaftClient<T> {
             if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
                 LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
 
-                if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
+                if (state.updateReplicaState(replicaId, Uuid.ZERO_UUID, 
currentTimeMs, info.startOffsetMetadata)) {
                     onUpdateLeaderHighWatermark(state, currentTimeMs);
                 }
 
@@ -1353,7 +1353,10 @@ final public class KafkaRaftClient<T> implements 
RaftClient<T> {
         LeaderState<T> leaderState = quorum.leaderStateOrThrow();
         return DescribeQuorumResponse.singletonResponse(
             log.topicPartition(),
-            leaderState.describeQuorum(currentTimeMs)
+            leaderState.describeQuorum(currentTimeMs),
+            requestMetadata.apiVersion() < 
DescribeQuorumResponseData.Node.LOWEST_SUPPORTED_VERSION
+                ? null
+                : leaderState.nodes(currentTimeMs)
         );
     }
 
@@ -1894,6 +1897,7 @@ final public class KafkaRaftClient<T> implements 
RaftClient<T> {
 
             RaftRequest.Outbound requestMessage = new RaftRequest.Outbound(
                 correlationId,
+                request.highestSupportedVersion(),
                 request,
                 destination,
                 currentTimeMs
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java
index 01edf845c60..67fa3eff930 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClientDriver.java
@@ -106,6 +106,7 @@ public class KafkaRaftClientDriver<T> extends 
ShutdownableThread {
     ) {
         RaftRequest.Inbound inboundRequest = new RaftRequest.Inbound(
             header.correlationId(),
+            header.apiVersion(),
             request,
             createdTimeMs
         );
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 13a6d106345..fcb4a5950bc 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -16,9 +16,10 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.DescribeQuorumResponseData;
-import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
 import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.ControlRecordUtils;
 import org.apache.kafka.common.utils.LogContext;
@@ -26,9 +27,11 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.raft.internals.VoterSet;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -51,6 +54,7 @@ public class LeaderState<T> implements EpochState {
     static final double CHECK_QUORUM_TIMEOUT_FACTOR = 1.5;
 
     private final int localId;
+    private final Uuid localDirectoryId;
     private final int epoch;
     private final long epochStartOffset;
     private final Set<Integer> grantingVoters;
@@ -71,21 +75,23 @@ public class LeaderState<T> implements EpochState {
     protected LeaderState(
         Time time,
         int localId,
+        Uuid localDirectoryId,
         int epoch,
         long epochStartOffset,
-        Set<Integer> voters,
+        Map<Integer, VoterSet.VoterNode> voters,
         Set<Integer> grantingVoters,
         BatchAccumulator<T> accumulator,
         int fetchTimeoutMs,
         LogContext logContext
     ) {
         this.localId = localId;
+        this.localDirectoryId = localDirectoryId;
         this.epoch = epoch;
         this.epochStartOffset = epochStartOffset;
 
-        for (int voterId : voters) {
-            boolean hasAcknowledgedLeader = voterId == localId;
-            this.voterStates.put(voterId, new ReplicaState(voterId, 
hasAcknowledgedLeader));
+        for (Map.Entry<Integer, VoterSet.VoterNode> voter : voters.entrySet()) 
{
+            boolean hasAcknowledgedLeader = voter.getKey() == localId;
+            this.voterStates.put(voter.getKey(), new 
ReplicaState(voter.getKey(), voter.getValue().voterKey().directoryId(), 
hasAcknowledgedLeader));
         }
         this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
         this.log = logContext.logger(LeaderState.class);
@@ -200,6 +206,10 @@ public class LeaderState<T> implements EpochState {
         return localId;
     }
 
+    public Uuid localDirectoryId() {
+        return localDirectoryId;
+    }
+
     public Set<Integer> nonAcknowledgingVoters() {
         Set<Integer> nonAcknowledging = new HashSet<>();
         for (ReplicaState state : voterStates.values()) {
@@ -298,14 +308,14 @@ public class LeaderState<T> implements EpochState {
      * Update the local replica state.
      *
      * @param endOffsetMetadata updated log end offset of local replica
-     * @param lastVoterSet the up-to-date voter set
+     * @param lastVoters the up-to-date voter set
      * @return true if the high watermark is updated as a result of this call
      */
     public boolean updateLocalState(
         LogOffsetMetadata endOffsetMetadata,
-        Set<Integer> lastVoterSet
+        Map<Integer, VoterSet.VoterNode> lastVoters
     ) {
-        ReplicaState state = getOrCreateReplicaState(localId);
+        ReplicaState state = getOrCreateReplicaState(localId, 
localDirectoryId);
         state.endOffset.ifPresent(currentEndOffset -> {
             if (currentEndOffset.offset > endOffsetMetadata.offset) {
                 throw new IllegalStateException("Detected non-monotonic update 
of local " +
@@ -313,7 +323,7 @@ public class LeaderState<T> implements EpochState {
             }
         });
         state.updateLeaderEndOffset(endOffsetMetadata);
-        updateVoterAndObserverStates(lastVoterSet);
+        updateVoterAndObserverStates(lastVoters);
         return maybeUpdateHighWatermark();
     }
 
@@ -321,12 +331,14 @@ public class LeaderState<T> implements EpochState {
      * Update the replica state in terms of fetch time and log end offsets.
      *
      * @param replicaId replica id
+     * @param replicaDirectoryId replica directory id
      * @param currentTimeMs current time in milliseconds
      * @param fetchOffsetMetadata new log offset and metadata
      * @return true if the high watermark is updated as a result of this call
      */
     public boolean updateReplicaState(
         int replicaId,
+        Uuid replicaDirectoryId,
         long currentTimeMs,
         LogOffsetMetadata fetchOffsetMetadata
     ) {
@@ -338,7 +350,7 @@ public class LeaderState<T> implements EpochState {
             throw new IllegalStateException("Remote replica ID " + replicaId + 
" matches the local leader ID");
         }
 
-        ReplicaState state = getOrCreateReplicaState(replicaId);
+        ReplicaState state = getOrCreateReplicaState(replicaId, 
replicaDirectoryId);
 
         state.endOffset.ifPresent(currentEndOffset -> {
             if (currentEndOffset.offset > fetchOffsetMetadata.offset) {
@@ -346,7 +358,7 @@ public class LeaderState<T> implements EpochState {
                     state.nodeId, currentEndOffset.offset, 
fetchOffsetMetadata.offset);
             }
         });
-        Optional<LogOffsetMetadata> leaderEndOffsetOpt = 
getOrCreateReplicaState(localId).endOffset;
+        Optional<LogOffsetMetadata> leaderEndOffsetOpt = 
getOrCreateReplicaState(localId, localDirectoryId).endOffset;
 
         state.updateFollowerState(
             currentTimeMs,
@@ -387,10 +399,10 @@ public class LeaderState<T> implements EpochState {
         return epochStartOffset;
     }
 
-    private ReplicaState getOrCreateReplicaState(int remoteNodeId) {
+    private ReplicaState getOrCreateReplicaState(int remoteNodeId, Uuid 
remoteNodeDirectory) {
         ReplicaState state = voterStates.get(remoteNodeId);
         if (state == null) {
-            observerStates.putIfAbsent(remoteNodeId, new 
ReplicaState(remoteNodeId, false));
+            observerStates.putIfAbsent(remoteNodeId, new 
ReplicaState(remoteNodeId, Optional.of(remoteNodeDirectory), false));
             return observerStates.get(remoteNodeId);
         }
         return state;
@@ -408,6 +420,29 @@ public class LeaderState<T> implements EpochState {
             .setObservers(describeReplicaStates(observerStates, 
currentTimeMs));
     }
 
+    public DescribeQuorumResponseData.NodeCollection nodes(long currentTimeMs) 
{
+        clearInactiveObservers(currentTimeMs);
+
+        return nodes(voterStates.values(), observerStates.values());
+    }
+
+    private static DescribeQuorumResponseData.NodeCollection 
nodes(Collection<ReplicaState> voters, Collection<ReplicaState> observers) {
+        DescribeQuorumResponseData.NodeCollection res = new 
DescribeQuorumResponseData.NodeCollection();
+
+        voters.forEach(replicaState -> node(res, replicaState));
+        observers.forEach(replicaState -> node(res, replicaState));
+
+        return res;
+    }
+
+    private static void node(DescribeQuorumResponseData.NodeCollection res, 
ReplicaState replicaState) {
+        if (res.find(replicaState.nodeId) != null) {
+            return;
+        }
+
+        res.add(new 
DescribeQuorumResponseData.Node().setNodeId(replicaState.nodeId));
+    }
+
     private List<DescribeQuorumResponseData.ReplicaState> 
describeReplicaStates(
         Map<Integer, ReplicaState> state,
         long currentTimeMs
@@ -432,6 +467,7 @@ public class LeaderState<T> implements EpochState {
         }
         return new DescribeQuorumResponseData.ReplicaState()
             .setReplicaId(replicaState.nodeId)
+            
.setReplicaDirectoryId(replicaState.nodeDirectory.orElse(Uuid.ZERO_UUID))
             .setLogEndOffset(replicaState.endOffset.map(md -> 
md.offset).orElse(-1L))
             .setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
             .setLastFetchTimestamp(lastFetchTimestamp);
@@ -452,11 +488,11 @@ public class LeaderState<T> implements EpochState {
         return voterStates.containsKey(remoteNodeId);
     }
 
-    private void updateVoterAndObserverStates(Set<Integer> lastVoterSet) {
+    private void updateVoterAndObserverStates(Map<Integer, VoterSet.VoterNode> 
lastVoters) {
         // Move any replica that is not in the last voter set from voterStates 
to observerStates
         for (Iterator<Map.Entry<Integer, ReplicaState>> iter = 
voterStates.entrySet().iterator(); iter.hasNext(); ) {
             Map.Entry<Integer, ReplicaState> replica = iter.next();
-            if (!lastVoterSet.contains(replica.getKey())) {
+            if (!lastVoters.containsKey(replica.getKey())) {
                 observerStates.put(replica.getKey(), replica.getValue());
                 iter.remove();
             }
@@ -464,24 +500,26 @@ public class LeaderState<T> implements EpochState {
 
         // Add replicas that are in the last voter set and not in voterStates 
to voterStates (from observerStates
         // if they exist)
-        for (int voterId : lastVoterSet) {
-            if (!voterStates.containsKey(voterId)) {
-                Optional<ReplicaState> existingObserverState = 
Optional.ofNullable(observerStates.remove(voterId));
-                voterStates.put(voterId, existingObserverState.orElse(new 
ReplicaState(voterId, false)));
+        for (Map.Entry<Integer, VoterSet.VoterNode> voter : 
lastVoters.entrySet()) {
+            if (!voterStates.containsKey(voter.getKey())) {
+                Optional<ReplicaState> existingObserverState = 
Optional.ofNullable(observerStates.remove(voter.getKey()));
+                voterStates.put(voter.getKey(), 
existingObserverState.orElse(new ReplicaState(voter.getKey(), 
voter.getValue().voterKey().directoryId(), false)));
             }
         }
     }
 
     private static class ReplicaState implements Comparable<ReplicaState> {
         final int nodeId;
+        final Optional<Uuid> nodeDirectory;
         Optional<LogOffsetMetadata> endOffset;
         long lastFetchTimestamp;
         long lastFetchLeaderLogEndOffset;
         long lastCaughtUpTimestamp;
         boolean hasAcknowledgedLeader;
 
-        public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
+        public ReplicaState(int nodeId, Optional<Uuid> nodeDirectory, boolean 
hasAcknowledgedLeader) {
             this.nodeId = nodeId;
+            this.nodeDirectory = nodeDirectory;
             this.endOffset = Optional.empty();
             this.lastFetchTimestamp = -1;
             this.lastFetchLeaderLogEndOffset = -1;
@@ -535,9 +573,10 @@ public class LeaderState<T> implements EpochState {
         @Override
         public String toString() {
             return String.format(
-                "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, 
" +
+                "ReplicaState(nodeId=%d, nodeDirectoryId=%s, endOffset=%s, 
lastFetchTimestamp=%s, " +
                         "lastCaughtUpTimestamp=%s, hasAcknowledgedLeader=%s)",
                 nodeId,
+                nodeDirectory,
                 endOffset,
                 lastFetchTimestamp,
                 lastCaughtUpTimestamp,
@@ -559,8 +598,9 @@ public class LeaderState<T> implements EpochState {
     @Override
     public String toString() {
         return String.format(
-            "Leader(localId=%d, epoch=%d, epochStartOffset=%d, 
highWatermark=%s, voterStates=%s)",
+            "Leader(localId=%d, localDirectoryId=%s, epoch=%d, 
epochStartOffset=%d, highWatermark=%s, voterStates=%s)",
             localId,
+            localDirectoryId,
             epoch,
             epochStartOffset,
             highWatermark,
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index b9b17c5f99b..b46d520a620 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -518,9 +518,10 @@ public class QuorumState {
         LeaderState<T> state = new LeaderState<>(
             time,
             localIdOrThrow(),
+            localDirectoryId(),
             epoch(),
             epochStartOffset,
-            latestVoterSet.get().voterIds(),
+            latestVoterSet.get().voters(),
             candidateState.grantingVoters(),
             accumulator,
             fetchTimeoutMs,
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftMessage.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftMessage.java
index f50ec902e0e..a3e71ed1cc1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftMessage.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftMessage.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
 public interface RaftMessage {
     int correlationId();
 
-    ApiMessage data();
+    short apiVersion();
 
+    ApiMessage data();
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftRequest.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftRequest.java
index bf590f56ab1..ba5bfefe211 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftRequest.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftRequest.java
@@ -23,11 +23,13 @@ import java.util.concurrent.CompletableFuture;
 
 public abstract class RaftRequest implements RaftMessage {
     private final int correlationId;
+    private final short apiVersion;
     private final ApiMessage data;
     private final long createdTimeMs;
 
-    public RaftRequest(int correlationId, ApiMessage data, long createdTimeMs) 
{
+    public RaftRequest(int correlationId, short apiVersion, ApiMessage data, 
long createdTimeMs) {
         this.correlationId = correlationId;
+        this.apiVersion = apiVersion;
         this.data = data;
         this.createdTimeMs = createdTimeMs;
     }
@@ -37,6 +39,11 @@ public abstract class RaftRequest implements RaftMessage {
         return correlationId;
     }
 
+    @Override
+    public short apiVersion() {
+        return apiVersion;
+    }
+
     @Override
     public ApiMessage data() {
         return data;
@@ -49,8 +56,8 @@ public abstract class RaftRequest implements RaftMessage {
     public final static class Inbound extends RaftRequest {
         public final CompletableFuture<RaftResponse.Outbound> completion = new 
CompletableFuture<>();
 
-        public Inbound(int correlationId, ApiMessage data, long createdTimeMs) 
{
-            super(correlationId, data, createdTimeMs);
+        public Inbound(int correlationId, short apiVersion, ApiMessage data, 
long createdTimeMs) {
+            super(correlationId, apiVersion, data, createdTimeMs);
         }
 
         @Override
@@ -68,8 +75,8 @@ public abstract class RaftRequest implements RaftMessage {
         private final Node destination;
         public final CompletableFuture<RaftResponse.Inbound> completion = new 
CompletableFuture<>();
 
-        public Outbound(int correlationId, ApiMessage data, Node destination, 
long createdTimeMs) {
-            super(correlationId, data, createdTimeMs);
+        public Outbound(int correlationId, short apiVersion, ApiMessage data, 
Node destination, long createdTimeMs) {
+            super(correlationId, apiVersion, data, createdTimeMs);
             this.destination = destination;
         }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftResponse.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftResponse.java
index 9c5047ca92d..fb0f22f2f4b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftResponse.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftResponse.java
@@ -33,6 +33,11 @@ public abstract class RaftResponse implements RaftMessage {
         return correlationId;
     }
 
+    @Override
+    public short apiVersion() {
+        return data().highestSupportedVersion();
+    }
+
     @Override
     public ApiMessage data() {
         return data;
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
index 9343cca8d47..216a67640c9 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java
@@ -33,6 +33,11 @@ public class BlockingMessageQueue implements 
RaftMessageQueue {
             return 0;
         }
 
+        @Override
+        public short apiVersion() {
+            return 0;
+        }
+
         @Override
         public ApiMessage data() {
             return null;
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index 393cb373b31..840d7b89931 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -138,6 +138,10 @@ final public class VoterSet {
         return voters.keySet();
     }
 
+    public Map<Integer, VoterNode> voters() {
+        return voters;
+    }
+
     /**
      * Adds a voter to the voter set.
      *
@@ -276,7 +280,7 @@ final public class VoterSet {
         private final Map<ListenerName, InetSocketAddress> listeners;
         private final SupportedVersionRange supportedKRaftVersion;
 
-        VoterNode(
+        public VoterNode(
             ReplicaKey voterKey,
             Map<ListenerName, InetSocketAddress> listeners,
             SupportedVersionRange supportedKRaftVersion
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index 2455990e770..6b013bac462 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -232,6 +232,7 @@ public class KafkaNetworkChannelTest {
         ApiMessage apiRequest = buildTestRequest(apiKey);
         RaftRequest.Outbound request = new RaftRequest.Outbound(
             correlationId,
+            apiRequest.highestSupportedVersion(),
             apiRequest,
             destination,
             createdTimeMs
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 00588a42305..3844725ec12 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.memory.MemoryPool;
@@ -2313,6 +2314,7 @@ public class KafkaRaftClientTest {
 
         DescribeQuorumResponseData responseData = 
context.collectDescribeQuorumResponse();
         assertEquals(Errors.NONE, Errors.forCode(responseData.errorCode()));
+        assertEquals("", responseData.errorMessage());
 
         assertEquals(1, responseData.topics().size());
         DescribeQuorumResponseData.TopicData topicData = 
responseData.topics().get(0);
@@ -2322,6 +2324,7 @@ public class KafkaRaftClientTest {
         DescribeQuorumResponseData.PartitionData partitionData = 
topicData.partitions().get(0);
         assertEquals(context.metadataPartition.partition(), 
partitionData.partitionIndex());
         assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, 
Errors.forCode(partitionData.errorCode()));
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), 
partitionData.errorMessage());
     }
 
     @Test
@@ -2364,6 +2367,7 @@ public class KafkaRaftClientTest {
             Arrays.asList(
                 new ReplicaState()
                     .setReplicaId(localId)
+                    .setReplicaDirectoryId(Uuid.ZERO_UUID)
                     // As we are appending the records directly to the log,
                     // the leader end offset hasn't been updated yet.
                     .setLogEndOffset(3L)
@@ -2371,17 +2375,20 @@ public class KafkaRaftClientTest {
                     .setLastCaughtUpTimestamp(context.time.milliseconds()),
                 new ReplicaState()
                     .setReplicaId(laggingFollower)
+                    .setReplicaDirectoryId(Uuid.ZERO_UUID)
                     .setLogEndOffset(1L)
                     .setLastFetchTimestamp(laggingFollowerFetchTime)
                     .setLastCaughtUpTimestamp(laggingFollowerFetchTime),
                 new ReplicaState()
                     .setReplicaId(closeFollower)
+                    .setReplicaDirectoryId(Uuid.ZERO_UUID)
                     .setLogEndOffset(3L)
                     .setLastFetchTimestamp(closeFollowerFetchTime)
                     .setLastCaughtUpTimestamp(closeFollowerFetchTime)),
             singletonList(
                 new ReplicaState()
                     .setReplicaId(observerId)
+                    .setReplicaDirectoryId(Uuid.ZERO_UUID)
                     .setLogEndOffset(0L)
                     .setLastFetchTimestamp(observerFetchTime)
                     .setLastCaughtUpTimestamp(-1L)));
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 6d31154ccf2..20b9e0b3f29 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -16,12 +16,13 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.DescribeQuorumResponseData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.raft.internals.VoterSet;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -29,10 +30,14 @@ import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
@@ -45,6 +50,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class LeaderStateTest {
     private final int localId = 0;
+    private final Uuid localDirectoryId = Uuid.randomUuid();
     private final int epoch = 5;
     private final LogContext logContext = new LogContext();
     private final BatchAccumulator<?> accumulator = 
Mockito.mock(BatchAccumulator.class);
@@ -59,9 +65,10 @@ public class LeaderStateTest {
         return new LeaderState<>(
             time,
             localId,
+            Uuid.randomUuid(),
             epoch,
             epochStartOffset,
-            voters,
+            toMap(voters),
             voters,
             accumulator,
             fetchTimeoutMs,
@@ -74,9 +81,10 @@ public class LeaderStateTest {
         assertThrows(NullPointerException.class, () -> new LeaderState<>(
             new MockTime(),
             localId,
+            Uuid.randomUuid(),
             epoch,
             0,
-            Collections.emptySet(),
+            Collections.emptyMap(),
             Collections.emptySet(),
             null,
             fetchTimeoutMs,
@@ -108,12 +116,12 @@ public class LeaderStateTest {
         Set<Integer> voterSet = singleton(localId);
         LeaderState<?> state = newLeaderState(voterSet, 15L);
         assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
voterSet));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
toMap(voterSet)));
         assertEquals(emptySet(), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), 
voterSet));
+        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), 
toMap(voterSet)));
         assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(20), 
voterSet));
+        assertTrue(state.updateLocalState(new LogOffsetMetadata(20), 
toMap(voterSet)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
     }
 
@@ -122,10 +130,10 @@ public class LeaderStateTest {
         Set<Integer> voterSet = singleton(localId);
         LeaderState<?> state = newLeaderState(voterSet, 15L);
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), 
voterSet));
+        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), 
toMap(voterSet)));
         assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
         assertThrows(IllegalStateException.class,
-            () -> state.updateLocalState(new LogOffsetMetadata(15L), 
voterSet));
+            () -> state.updateLocalState(new LogOffsetMetadata(15L), 
toMap(voterSet)));
     }
 
     @Test
@@ -138,42 +146,42 @@ public class LeaderStateTest {
         Set<Integer> voterSet = mkSet(localId, node1, node2);
         LeaderState<?> state = newLeaderState(voterSet, 10L);
         assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(10L), 
voterSet));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(10L), 
toMap(voterSet)));
         assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
 
         // Node 1 falls behind
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(11L), 
voterSet));
-        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(11L), 
toMap(voterSet)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(10L)));
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
 
         // Node 1 catches up to leader
-        assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(11L)));
+        assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(11L)));
         caughtUpTime = fetchTime;
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
 
         // Node 1 falls behind
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(100L), 
voterSet));
-        assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(50L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(100L), 
toMap(voterSet)));
+        assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(50L)));
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
 
         // Node 1 catches up to the last fetch offset
         int prevFetchTime = fetchTime;
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(200L), 
voterSet));
-        assertTrue(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(100L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(200L), 
toMap(voterSet)));
+        assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(100L)));
         caughtUpTime = prevFetchTime;
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeVoterState(state, node1, 
currentTime).lastCaughtUpTimestamp());
 
         // Node2 has never caught up to leader
         assertEquals(-1L, describeVoterState(state, node2, 
currentTime).lastCaughtUpTimestamp());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(300L), 
voterSet));
-        assertTrue(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(200L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(300L), 
toMap(voterSet)));
+        assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(200L)));
         assertEquals(-1L, describeVoterState(state, node2, 
currentTime).lastCaughtUpTimestamp());
-        assertTrue(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(250L)));
+        assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(250L)));
         assertEquals(-1L, describeVoterState(state, node2, 
currentTime).lastCaughtUpTimestamp());
     }
 
@@ -189,33 +197,33 @@ public class LeaderStateTest {
         assertEquals(emptySet(), state.nonAcknowledgingVoters());
 
         // Node 1 falls behind
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(11L), 
voterSet));
-        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertTrue(state.updateLocalState(new LogOffsetMetadata(11L), 
toMap(voterSet)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(10L)));
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
 
         // Node 1 catches up to leader
-        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(11L)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(11L)));
         caughtUpTime = fetchTime;
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
 
         // Node 1 falls behind
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(100L), 
voterSet));
-        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(50L)));
+        assertTrue(state.updateLocalState(new LogOffsetMetadata(100L), 
toMap(voterSet)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(50L)));
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
 
         // Node 1 catches up to the last fetch offset
         int prevFetchTime = fetchTime;
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(200L), 
voterSet));
-        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(102L)));
+        assertTrue(state.updateLocalState(new LogOffsetMetadata(200L), 
toMap(voterSet)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(102L)));
         caughtUpTime = prevFetchTime;
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
 
         // Node 1 catches up to leader
-        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(200L)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
++fetchTime, new LogOffsetMetadata(200L)));
         caughtUpTime = fetchTime;
         assertEquals(currentTime, describeVoterState(state, localId, 
currentTime).lastCaughtUpTimestamp());
         assertEquals(caughtUpTime, describeObserverState(state, node1, 
currentTime).lastCaughtUpTimestamp());
@@ -226,8 +234,8 @@ public class LeaderStateTest {
         Set<Integer> voterSet = singleton(localId);
         LeaderState<?> state = newLeaderState(voterSet, 15L);
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), 
voterSet));
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), 
voterSet));
+        assertTrue(state.updateLocalState(new LogOffsetMetadata(16L), 
toMap(voterSet)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), 
toMap(voterSet)));
         assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
     }
 
@@ -238,11 +246,11 @@ public class LeaderStateTest {
         assertEquals(Optional.empty(), state.highWatermark());
 
         LogOffsetMetadata initialHw = new LogOffsetMetadata(16L, 
Optional.of(new MockOffsetMetadata("bar")));
-        assertTrue(state.updateLocalState(initialHw, voterSet));
+        assertTrue(state.updateLocalState(initialHw, toMap(voterSet)));
         assertEquals(Optional.of(initialHw), state.highWatermark());
 
         LogOffsetMetadata updateHw = new LogOffsetMetadata(16L, 
Optional.of(new MockOffsetMetadata("baz")));
-        assertTrue(state.updateLocalState(updateHw, voterSet));
+        assertTrue(state.updateLocalState(updateHw, toMap(voterSet)));
         assertEquals(Optional.of(updateHw), state.highWatermark());
     }
 
@@ -251,15 +259,15 @@ public class LeaderStateTest {
         int otherNodeId = 1;
         Set<Integer> voterSet = mkSet(localId, otherNodeId);
         LeaderState<?> state = newLeaderState(voterSet, 10L);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(13L), 
voterSet));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(13L), 
toMap(voterSet)));
         assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(otherNodeId, Uuid.randomUuid(), 
0, new LogOffsetMetadata(10L)));
         assertEquals(emptySet(), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(11L)));
+        assertTrue(state.updateReplicaState(otherNodeId, Uuid.randomUuid(), 0, 
new LogOffsetMetadata(11L)));
         assertEquals(Optional.of(new LogOffsetMetadata(11L)), 
state.highWatermark());
-        assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(13L)));
+        assertTrue(state.updateReplicaState(otherNodeId, Uuid.randomUuid(), 0, 
new LogOffsetMetadata(13L)));
         assertEquals(Optional.of(new LogOffsetMetadata(13L)), 
state.highWatermark());
     }
 
@@ -269,22 +277,22 @@ public class LeaderStateTest {
         int node2 = 2;
         Set<Integer> voterSet = mkSet(localId, node1, node2);
         LeaderState<?> state = newLeaderState(voterSet, 10L);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
voterSet));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
toMap(voterSet)));
         assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(10L)));
         assertEquals(singleton(node2), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(10L)));
         assertEquals(emptySet(), state.nonAcknowledgingVoters());
         assertEquals(Optional.empty(), state.highWatermark());
-        assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
+        assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), 
voterSet));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(20L), 
toMap(voterSet)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(20L)));
+        assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(20L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(20L)));
         assertEquals(Optional.of(new LogOffsetMetadata(20L)), 
state.highWatermark());
     }
 
@@ -294,23 +302,23 @@ public class LeaderStateTest {
         int node2 = 2;
         Set<Integer> originalVoterSet = mkSet(localId, node1);
         LeaderState<?> state = newLeaderState(originalVoterSet, 5L);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
originalVoterSet));
-        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
toMap(originalVoterSet)));
+        assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(10L)));
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
 
         // updating replica state of node2 before it joins voterSet should not 
increase HW to 15L
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
 
         // adding node2 to voterSet will cause HW to increase to 15L
         Set<Integer> voterSetWithNode2 = mkSet(localId, node1, node2);
-        assertTrue(state.updateLocalState(new LogOffsetMetadata(15L), 
voterSetWithNode2));
+        assertTrue(state.updateLocalState(new LogOffsetMetadata(15L), 
toMap(voterSetWithNode2)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // HW will not update to 16L until a majority reaches it
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), 
voterSetWithNode2));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), 
toMap(voterSetWithNode2)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(16L)));
+        assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(16L)));
         assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
     }
 
@@ -322,29 +330,29 @@ public class LeaderStateTest {
         // start with three voters with HW at 15L
         Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
         LeaderState<?> state = newLeaderState(originalVoterSet, 5L);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
originalVoterSet));
-        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(15L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
toMap(originalVoterSet)));
+        assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(10L)));
 
         // updating replica state of node3 before it joins voterSet
-        assertFalse(state.updateReplicaState(node3, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node3, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(10L)));
 
         // adding node3 to voterSet should not cause HW to decrease even if 
majority is < HW
         Set<Integer> voterSetWithNode3 = mkSet(localId, node1, node2, node3);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), 
voterSetWithNode3));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(16L), 
toMap(voterSetWithNode3)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // HW will not decrease if calculated HW is anything lower than the 
last HW
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(13L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(13L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node3, 0, new 
LogOffsetMetadata(13L)));
+        assertFalse(state.updateReplicaState(node3, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(13L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(16L)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(16L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // HW will update to 16L once a majority of the voterSet is at least 
16L
-        assertTrue(state.updateReplicaState(node3, 0, new 
LogOffsetMetadata(16L)));
+        assertTrue(state.updateReplicaState(node3, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(16L)));
         assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
     }
 
@@ -354,28 +362,28 @@ public class LeaderStateTest {
         int node2 = 2;
         Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
         LeaderState<?> state = newLeaderState(originalVoterSet, 10L);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
originalVoterSet));
-        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(15L)));
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
toMap(originalVoterSet)));
+        assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(15L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(10L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // removing node1 should not decrement HW to 10L
         Set<Integer> voterSetWithoutNode1 = mkSet(localId, node2);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), 
voterSetWithoutNode1));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), 
toMap(voterSetWithoutNode1)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // HW cannot change until after node2 catches up to last HW
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(14L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(14L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), 
voterSetWithoutNode1));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), 
toMap(voterSetWithoutNode1)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(18L)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(18L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // HW should update to 16L
-        assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(16L)));
+        assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(16L)));
         assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
     }
 
@@ -385,28 +393,28 @@ public class LeaderStateTest {
         int node2 = 2;
         Set<Integer> originalVoterSet = mkSet(localId, node1, node2);
         LeaderState<?> state = newLeaderState(originalVoterSet, 10L);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
originalVoterSet));
-        assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(15L)));
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(15L), 
toMap(originalVoterSet)));
+        assertTrue(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(15L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(10L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // removing leader should not decrement HW to 10L
         Set<Integer> voterSetWithoutLeader = mkSet(node1, node2);
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), 
voterSetWithoutLeader));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(17L), 
toMap(voterSetWithoutLeader)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // HW cannot change until node2 catches up to last HW
-        assertFalse(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(16L)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(16L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), 
voterSetWithoutLeader));
+        assertFalse(state.updateLocalState(new LogOffsetMetadata(18L), 
toMap(voterSetWithoutLeader)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(14L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(14L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
-        assertFalse(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
+        assertFalse(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
         // HW will not update to 16L until majority of remaining voterSet 
(node1, node2) are at least 16L
-        assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(16L)));
+        assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(16L)));
         assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
     }
 
@@ -416,13 +424,13 @@ public class LeaderStateTest {
         int node1 = 1;
         Set<Integer> voterSet = mkSet(localId, node1);
         LeaderState<?> state = newLeaderState(voterSet, 0L);
-        state.updateLocalState(new LogOffsetMetadata(10L), voterSet);
-        state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(10L));
+        state.updateLocalState(new LogOffsetMetadata(10L), toMap(voterSet));
+        state.updateReplicaState(node1, Uuid.randomUuid(), 
time.milliseconds(), new LogOffsetMetadata(10L));
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
 
         // Follower crashes and disk is lost. It fetches an earlier offset to 
rebuild state.
         // The leader will report an error in the logs, but will not let the 
high watermark rewind
-        assertFalse(state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(5L)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
time.milliseconds(), new LogOffsetMetadata(5L)));
         assertEquals(5L, describeVoterState(state, node1, 
time.milliseconds()).logEndOffset());
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
     }
@@ -459,6 +467,7 @@ public class LeaderStateTest {
         assertEquals(1, partitionData.currentVoters().size());
         assertEquals(new DescribeQuorumResponseData.ReplicaState()
                 .setReplicaId(localId)
+                .setReplicaDirectoryId(localDirectoryId)
                 .setLogEndOffset(-1)
                 .setLastFetchTimestamp(time.milliseconds())
                 .setLastCaughtUpTimestamp(time.milliseconds()),
@@ -466,7 +475,7 @@ public class LeaderStateTest {
 
 
         // Now update the high watermark and verify the describe output
-        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(leaderEndOffset), voterSet));
+        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(leaderEndOffset), toMap(voterSet)));
         assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), 
state.highWatermark());
 
         time.sleep(500);
@@ -479,6 +488,7 @@ public class LeaderStateTest {
         assertEquals(1, partitionData.currentVoters().size());
         assertEquals(new DescribeQuorumResponseData.ReplicaState()
                 .setReplicaId(localId)
+                .setReplicaDirectoryId(localDirectoryId)
                 .setLogEndOffset(leaderEndOffset)
                 .setLastFetchTimestamp(time.milliseconds())
                 .setLastCaughtUpTimestamp(time.milliseconds()),
@@ -489,17 +499,34 @@ public class LeaderStateTest {
     public void testDescribeQuorumWithMultipleVoters() {
         MockTime time = new MockTime();
         int activeFollowerId = 1;
+        Uuid activeFollowerDirectoryId = Uuid.randomUuid();
         int inactiveFollowerId = 2;
+        Uuid inactiveFollowerDirectoryId = Uuid.randomUuid();
         long leaderStartOffset = 10L;
         long leaderEndOffset = 15L;
 
-        Set<Integer> voterSet = mkSet(localId, activeFollowerId, 
inactiveFollowerId);
-        LeaderState<?> state = newLeaderState(voterSet, leaderStartOffset);
-        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(leaderEndOffset), voterSet));
+        Map<Integer, VoterSet.VoterNode> voters = new HashMap<>();
+        voters.put(localId, voterNode(localId, localDirectoryId));
+        voters.put(activeFollowerId, voterNode(activeFollowerId, 
activeFollowerDirectoryId));
+        voters.put(inactiveFollowerId, voterNode(inactiveFollowerId, 
inactiveFollowerDirectoryId));
+
+        LeaderState<?> state = new LeaderState<>(
+            time,
+            localId,
+            Uuid.randomUuid(),
+            epoch,
+            leaderStartOffset,
+            voters,
+            voters.keySet(),
+            accumulator,
+            fetchTimeoutMs,
+            logContext
+        );
+        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(leaderEndOffset), voters));
         assertEquals(Optional.empty(), state.highWatermark());
 
         long activeFollowerFetchTimeMs = time.milliseconds();
-        assertTrue(state.updateReplicaState(activeFollowerId, 
activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertTrue(state.updateReplicaState(activeFollowerId, 
activeFollowerDirectoryId, activeFollowerFetchTimeMs, new 
LogOffsetMetadata(leaderEndOffset)));
         assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), 
state.highWatermark());
 
         time.sleep(500);
@@ -517,6 +544,7 @@ public class LeaderStateTest {
             findReplicaOrFail(localId, partitionData.currentVoters());
         assertEquals(new DescribeQuorumResponseData.ReplicaState()
                 .setReplicaId(localId)
+                .setReplicaDirectoryId(localDirectoryId)
                 .setLogEndOffset(leaderEndOffset)
                 .setLastFetchTimestamp(time.milliseconds())
                 .setLastCaughtUpTimestamp(time.milliseconds()),
@@ -526,6 +554,7 @@ public class LeaderStateTest {
             findReplicaOrFail(activeFollowerId, partitionData.currentVoters());
         assertEquals(new DescribeQuorumResponseData.ReplicaState()
                 .setReplicaId(activeFollowerId)
+                .setReplicaDirectoryId(activeFollowerDirectoryId)
                 .setLogEndOffset(leaderEndOffset)
                 .setLastFetchTimestamp(activeFollowerFetchTimeMs)
                 .setLastCaughtUpTimestamp(activeFollowerFetchTimeMs),
@@ -535,6 +564,7 @@ public class LeaderStateTest {
             findReplicaOrFail(inactiveFollowerId, 
partitionData.currentVoters());
         assertEquals(new DescribeQuorumResponseData.ReplicaState()
                 .setReplicaId(inactiveFollowerId)
+                .setReplicaDirectoryId(inactiveFollowerDirectoryId)
                 .setLogEndOffset(-1)
                 .setLastFetchTimestamp(-1)
                 .setLastCaughtUpTimestamp(-1),
@@ -547,10 +577,10 @@ public class LeaderStateTest {
                                                    long leaderEndOffset) {
         Set<Integer> voterSet = mkSet(localId, follower1, follower2);
         LeaderState<?> state = newLeaderState(voterSet, leaderStartOffset);
-        state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), 
voterSet);
+        state.updateLocalState(new LogOffsetMetadata(leaderEndOffset), 
toMap(voterSet));
         assertEquals(Optional.empty(), state.highWatermark());
-        state.updateReplicaState(follower1, 0, new 
LogOffsetMetadata(leaderStartOffset));
-        state.updateReplicaState(follower2, 0, new 
LogOffsetMetadata(leaderEndOffset));
+        state.updateReplicaState(follower1, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(leaderStartOffset));
+        state.updateReplicaState(follower2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(leaderEndOffset));
         return state;
     }
 
@@ -558,16 +588,17 @@ public class LeaderStateTest {
     public void testDescribeQuorumWithObservers() {
         MockTime time = new MockTime();
         int observerId = 10;
+        Uuid observerDirectoryId = Uuid.randomUuid();
         long epochStartOffset = 10L;
 
         Set<Integer> voterSet = singleton(localId);
         LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
-        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), voterSet));
+        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), toMap(voterSet)));
         assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), 
state.highWatermark());
 
         time.sleep(500);
         long observerFetchTimeMs = time.milliseconds();
-        assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, 
new LogOffsetMetadata(epochStartOffset + 1)));
+        assertFalse(state.updateReplicaState(observerId, observerDirectoryId, 
observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
 
         time.sleep(500);
         DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
@@ -584,6 +615,7 @@ public class LeaderStateTest {
         DescribeQuorumResponseData.ReplicaState observerState = 
observerStates.get(0);
         assertEquals(new DescribeQuorumResponseData.ReplicaState()
                 .setReplicaId(observerId)
+                .setReplicaDirectoryId(observerDirectoryId)
                 .setLogEndOffset(epochStartOffset + 1)
                 .setLastFetchTimestamp(observerFetchTimeMs)
                 .setLastCaughtUpTimestamp(observerFetchTimeMs),
@@ -600,15 +632,15 @@ public class LeaderStateTest {
 
         Set<Integer> voterSet = mkSet(leader, node1, node2);
         LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
-        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), voterSet));
-        assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(epochStartOffset + 1)));
+        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), toMap(voterSet)));
+        assertTrue(state.updateReplicaState(node2, Uuid.randomUuid(), 0, new 
LogOffsetMetadata(epochStartOffset + 1)));
         assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), 
state.highWatermark());
 
         // node1 becomes an observer
         long fetchTimeMs = time.milliseconds();
-        assertFalse(state.updateReplicaState(node1, fetchTimeMs, new 
LogOffsetMetadata(epochStartOffset + 1)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
         Set<Integer> voterSetWithoutNode1 = mkSet(leader, node2);
-        state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5), 
voterSetWithoutNode1);
+        state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5), 
toMap(voterSetWithoutNode1));
 
 
         time.sleep(500);
@@ -624,11 +656,11 @@ public class LeaderStateTest {
         // node1 catches up with leader, HW should not change
         time.sleep(500);
         fetchTimeMs = time.milliseconds();
-        assertFalse(state.updateReplicaState(node1, fetchTimeMs, new 
LogOffsetMetadata(epochStartOffset + 5)));
+        assertFalse(state.updateReplicaState(node1, Uuid.randomUuid(), 
fetchTimeMs, new LogOffsetMetadata(epochStartOffset + 5)));
         assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), 
state.highWatermark());
 
         // node1 becomes a voter again, HW should change
-        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 10), voterSet));
+        assertTrue(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 10), toMap(voterSet)));
 
         time.sleep(500);
         partitionData = state.describeQuorum(time.milliseconds());
@@ -653,13 +685,13 @@ public class LeaderStateTest {
 
         Set<Integer> voterSet = mkSet(localId, followerId);
         LeaderState<?> state = newLeaderState(voterSet, epochStartOffset);
-        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), voterSet));
-        assertTrue(state.updateReplicaState(followerId, time.milliseconds(), 
new LogOffsetMetadata(epochStartOffset + 1)));
+        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 1), toMap(voterSet)));
+        assertTrue(state.updateReplicaState(followerId, Uuid.randomUuid(), 
time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1)));
 
         // observer is returned since its lastFetchTimestamp is within 
OBSERVER_SESSION_TIMEOUT_MS
         time.sleep(500);
         long observerFetchTimeMs = time.milliseconds();
-        assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, 
new LogOffsetMetadata(epochStartOffset + 1)));
+        assertFalse(state.updateReplicaState(observerId, Uuid.randomUuid(), 
observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1)));
 
         time.sleep(500);
         DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
@@ -678,8 +710,8 @@ public class LeaderStateTest {
         assertEquals(0, partitionData.observers().size());
 
         // leader becomes observer
-        Set<Integer> voterSetWithoutLeader = singleton(followerId);
-        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 10), voterSetWithoutLeader));
+        Set<Integer> voterSetWithoutLeader = mkSet(followerId);
+        assertFalse(state.updateLocalState(new 
LogOffsetMetadata(epochStartOffset + 10), toMap(voterSetWithoutLeader)));
 
         // leader should be returned in describe quorum output
         time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
@@ -752,7 +784,7 @@ public class LeaderStateTest {
         long epochStartOffset = 10L;
 
         LeaderState<?> state = newLeaderState(mkSet(localId), 
epochStartOffset);
-        assertFalse(state.updateReplicaState(replicaId, 0, new 
LogOffsetMetadata(epochStartOffset)));
+        assertFalse(state.updateReplicaState(replicaId, Uuid.randomUuid(), 0, 
new LogOffsetMetadata(epochStartOffset)));
 
         DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
         List<DescribeQuorumResponseData.ReplicaState> observerStates = 
partitionData.observers();
@@ -766,7 +798,7 @@ public class LeaderStateTest {
         long epochStartOffset = 10L;
         LeaderState<?> state = newLeaderState(mkSet(localId), 
epochStartOffset);
 
-        state.updateReplicaState(observerId, time.milliseconds(), new 
LogOffsetMetadata(epochStartOffset));
+        state.updateReplicaState(observerId, Uuid.randomUuid(), 
time.milliseconds(), new LogOffsetMetadata(epochStartOffset));
         DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
         List<DescribeQuorumResponseData.ReplicaState> observerStates = 
partitionData.observers();
         assertEquals(1, observerStates.size());
@@ -782,7 +814,7 @@ public class LeaderStateTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testGrantVote(boolean isLogUpToDate) {
-        LeaderState<?> state = newLeaderState(Utils.mkSet(1, 2, 3), 1);
+        LeaderState<?> state = newLeaderState(mkSet(1, 2, 3), 1);
 
         assertFalse(state.canGrantVote(ReplicaKey.of(1, Optional.empty()), 
isLogUpToDate));
         assertFalse(state.canGrantVote(ReplicaKey.of(2, Optional.empty()), 
isLogUpToDate));
@@ -840,4 +872,11 @@ public class LeaderStateTest {
             ));
     }
 
+    private Map<Integer, VoterSet.VoterNode> toMap(Set<Integer> data) {
+        return data.stream().collect(Collectors.toMap(Function.identity(), id 
-> voterNode(id, id == localId ? localDirectoryId : Uuid.randomUuid())));
+    }
+
+    private VoterSet.VoterNode voterNode(int id, Uuid directoryId) {
+        return new VoterSet.VoterNode(ReplicaKey.of(id, 
Optional.of(directoryId)), null, null);
+    }
 }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 19f2fb61ec6..1ebc875d1d4 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -528,9 +528,24 @@ public final class RaftClientTestContext {
             .setHighWatermark(highWatermark)
             .setCurrentVoters(voterStates)
             .setObservers(observerStates);
+
+        DescribeQuorumResponseData.NodeCollection nodes = new 
DescribeQuorumResponseData.NodeCollection();
+
+        Consumer<DescribeQuorumResponseData.ReplicaState> addToNodes = 
replicaState -> {
+            if (nodes.find(replicaState.replicaId()) != null)
+                return;
+
+            nodes.add(new DescribeQuorumResponseData.Node()
+                .setNodeId(replicaState.replicaId()));
+        };
+
+        voterStates.forEach(addToNodes);
+        observerStates.forEach(addToNodes);
+
         DescribeQuorumResponseData expectedResponse = 
DescribeQuorumResponse.singletonResponse(
             metadataPartition,
-            partitionData
+            partitionData,
+            nodes
         );
         assertEquals(expectedResponse, response);
     }
@@ -595,7 +610,7 @@ public final class RaftClientTestContext {
 
     void deliverRequest(ApiMessage request) {
         RaftRequest.Inbound inboundRequest = new RaftRequest.Inbound(
-            channel.newCorrelationId(), request, time.milliseconds());
+            channel.newCorrelationId(), request.highestSupportedVersion(), 
request, time.milliseconds());
         inboundRequest.completion.whenComplete((response, exception) -> {
             if (exception != null) {
                 throw new RuntimeException(exception);
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 4896571c22e..2c0fd1eff2b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -1235,7 +1235,7 @@ public class RaftEventSimulationTest {
 
             int correlationId = outbound.correlationId();
             Node destination = outbound.destination();
-            RaftRequest.Inbound inbound = new 
RaftRequest.Inbound(correlationId, outbound.data(),
+            RaftRequest.Inbound inbound = new 
RaftRequest.Inbound(correlationId, outbound.apiVersion(), outbound.data(),
                 cluster.time.milliseconds());
 
             if (!filters.get(destination.id()).acceptInbound(inbound))
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index ce59d587b9d..f84e81b18dd 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -147,8 +147,8 @@ public class KafkaRaftMetricsTest {
         assertEquals((double) 1, getMetric(metrics, 
"current-epoch").metricValue());
         assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
 
-        state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L), 
voters.voterIds());
-        state.leaderStateOrThrow().updateReplicaState(1, 0, new 
LogOffsetMetadata(5L));
+        state.leaderStateOrThrow().updateLocalState(new LogOffsetMetadata(5L), 
voters.voters());
+        state.leaderStateOrThrow().updateReplicaState(1, Uuid.randomUuid(), 0, 
new LogOffsetMetadata(5L));
         assertEquals((double) 5L, getMetric(metrics, 
"high-watermark").metricValue());
 
         state.transitionToFollower(2, voters.voterNode(1, 
VoterSetTest.DEFAULT_LISTENER_NAME).get());

Reply via email to