This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 81edb74c5ead06aa89311fbd4d10ebaa6ef04d2a
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jul 30 18:31:01 2024 +0000

    KAFKA-16533; Update voter handling
    
    Add support for handling the update voter RPC. The update voter RPC is used 
to automatically update
    the voters supported kraft versions and available endpoints as the operator 
upgrades and
    reconfigures the KRaft controllers.
    
    The add voter RPC is handled as follow:
    
    1. Check that the leader has fenced the previous leader(s) by checking that 
the HWM is known;
       otherwise, return the REQUEST_TIMED_OUT error.
    
    2. Check that the cluster supports kraft.version 1; otherwise, return the 
UNSUPPORTED_VERSION error.
    
    3. Check that there are no uncommitted voter changes, otherwise return the 
REQUEST_TIMED_OUT error.
    
    4. Check that the updated voter still supports the currently finalized 
kraft.version; otherwise
       return the INVALID_REQUEST error.
    
    5. Check that the updated voter is still listening on the default listener.
    
    6. Append the updated VotersRecord to the log. The KRaft internal listener 
will read this
       uncommitted record from the log and update the voter in the set of 
voters.
    
    7. Wait for the VotersRecord to commit using the majority of the voters. 
Return a REQUEST_TIMED_OUT
       error if it doesn't commit in time.
    
    8. Send the UpdateVoter successful response to the voter.
    
    This change also implements the ability for the leader to update its own 
entry in the voter
    set when it becomes leader for an epoch. This is done by updating the voter 
set and writing a
    control batch as the first batch in a new leader epoch.
    
    Finally, fix a bug in KafkaAdminClient's handling of 
removeRaftVoterResponse where we tried to cast
    the response to the wrong type.
    
    Reviewers: Alyssa Huang <[email protected]>, Colin P. McCabe 
<[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   3 +-
 .../common/message/UpdateRaftVoterRequest.json     |   2 +
 .../common/message/UpdateRaftVoterResponse.json    |  12 +-
 .../kafka/common/requests/RequestResponseTest.java |  16 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |   6 +-
 .../main/scala/kafka/server/ControllerApis.scala   |   2 +-
 .../main/java/org/apache/kafka/raft/Endpoints.java |  28 +
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 118 +++-
 .../java/org/apache/kafka/raft/LeaderState.java    | 137 ++--
 .../java/org/apache/kafka/raft/QuorumState.java    |   5 +
 .../main/java/org/apache/kafka/raft/RaftUtil.java  |  57 +-
 .../kafka/raft/internals/AddVoterHandler.java      |   4 +-
 .../kafka/raft/internals/AddVoterHandlerState.java |   9 +-
 .../kafka/raft/internals/RemoveVoterHandler.java   |   6 +-
 .../raft/internals/RemoveVoterHandlerState.java    |   9 +-
 .../kafka/raft/internals/UpdateVoterHandler.java   | 233 ++++++
 ...dlerState.java => UpdateVoterHandlerState.java} |  39 +-
 .../org/apache/kafka/raft/internals/VoterSet.java  |  33 +
 .../kafka/raft/KafkaRaftClientReconfigTest.java    | 783 ++++++++++++++++++++-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |   7 +-
 .../org/apache/kafka/raft/LeaderStateTest.java     |   3 +
 .../org/apache/kafka/raft/QuorumStateTest.java     |   2 +
 .../apache/kafka/raft/RaftClientTestContext.java   | 122 +++-
 .../apache/kafka/raft/RaftEventSimulationTest.java |   2 +
 .../kafka/raft/internals/KafkaRaftMetricsTest.java |   2 +
 .../apache/kafka/raft/internals/VoterSetTest.java  |  34 +-
 .../org/apache/kafka/server/common/Features.java   |   9 +
 .../apache/kafka/server/common/KRaftVersion.java   |  18 +-
 .../kafka/server/common/KRaftVersionTest.java      |  53 +-
 29 files changed, 1642 insertions(+), 112 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2f195489add..3672b61647d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -239,6 +239,7 @@ import 
org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RemoveRaftVoterRequest;
+import org.apache.kafka.common.requests.RemoveRaftVoterResponse;
 import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
 import org.apache.kafka.common.requests.UnregisterBrokerRequest;
@@ -4716,7 +4717,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             void handleResponse(AbstractResponse response) {
-                AddRaftVoterResponse addResponse = (AddRaftVoterResponse) 
response;
+                RemoveRaftVoterResponse addResponse = 
(RemoveRaftVoterResponse) response;
                 if (addResponse.data().errorCode() != Errors.NONE.code()) {
                     ApiError error = new ApiError(
                             addResponse.data().errorCode(),
diff --git 
a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json 
b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json
index 80ee58a43a3..5dde41cff56 100644
--- a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json
+++ b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json
@@ -22,6 +22,8 @@
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "0+" },
+    { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
+      "about": "The current leader epoch of the partition, -1 for unknown 
leader epoch" },
     { "name": "VoterId", "type": "int32", "versions": "0+",
       "about": "The replica id of the voter getting updated in the topic 
partition" },
     { "name": "VoterDirectoryId", "type": "uuid", "versions": "0+",
diff --git 
a/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json 
b/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json
index 64816406c74..33b49c37198 100644
--- a/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json
+++ b/clients/src/main/resources/common/message/UpdateRaftVoterResponse.json
@@ -23,6 +23,16 @@
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
-      "about": "The error code, or 0 if there was no error" }
+      "about": "The error code, or 0 if there was no error" },
+    { "name": "CurrentLeader", "type": "CurrentLeader", "versions": "0+",
+      "taggedVersions": "0+", "tag": 0, "fields": [
+        { "name": "LeaderId", "type": "int32", "versions": "0+", "default": 
"-1", "entityType" : "brokerId",
+          "about": "The replica id of the current leader or -1 if the leader 
is unknown" },
+        { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": 
"-1",
+          "about": "The latest known leader epoch" },
+        { "name": "Host", "type": "string", "versions": "0+", "about": "The 
node's hostname" },
+        { "name": "Port", "type": "int32", "versions": "0+", "about": "The 
node's port" }
+      ]
+    }
   ]
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 54e681b468b..7aee4991635 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1365,8 +1365,16 @@ public class RequestResponseTest {
     }
 
     private UpdateRaftVoterResponse createUpdateRaftVoterResponse() {
-        return new UpdateRaftVoterResponse(new UpdateRaftVoterResponseData().
-            setErrorCode((short) 0));
+        return new UpdateRaftVoterResponse(
+            new UpdateRaftVoterResponseData()
+                .setErrorCode((short) 0)
+                .setCurrentLeader(new 
UpdateRaftVoterResponseData.CurrentLeader()
+                    .setLeaderId(1)
+                    .setLeaderEpoch(2)
+                    .setHost("localhost")
+                    .setPort(9999)
+                )
+        );
     }
 
     private DescribeTopicPartitionsResponse 
createDescribeTopicPartitionsResponse() {
@@ -3020,7 +3028,7 @@ public class RequestResponseTest {
                             .setName("topic")
                             
.setPartitions(Collections.singletonList(73))).iterator())))
                     .iterator());
-            return 
AddPartitionsToTxnRequest.Builder.forBroker(transactions).build(version);  
+            return 
AddPartitionsToTxnRequest.Builder.forBroker(transactions).build(version);
         }
     }
 
@@ -3029,7 +3037,7 @@ public class RequestResponseTest {
         AddPartitionsToTxnResponseData.AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(
                 txnId, Collections.singletonMap(new TopicPartition("t", 0), 
Errors.NONE));
         AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(0);
-        
+
         if (version < 4) {
             data.setResultsByTopicV3AndBelow(result.topicResults());
         } else {
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index caa77a9fd6d..25c8e8b294b 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -45,6 +45,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.raft.{Endpoints, FileQuorumStateStore, 
KafkaNetworkChannel, KafkaRaftClient, KafkaRaftClientDriver, LeaderAndEpoch, 
QuorumConfig, RaftClient, ReplicatedLog}
 import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.common.Features
 import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
 import org.apache.kafka.server.fault.FaultHandler
@@ -153,7 +154,7 @@ class KafkaRaftManager[T](
   threadNamePrefixOpt: Option[String],
   val controllerQuorumVotersFuture: CompletableFuture[JMap[Integer, 
InetSocketAddress]],
   bootstrapServers: JCollection[InetSocketAddress],
-  controllerListeners: Endpoints,
+  localListeners: Endpoints,
   fatalFaultHandler: FaultHandler
 ) extends RaftManager[T] with Logging {
 
@@ -236,7 +237,8 @@ class KafkaRaftManager[T](
       logContext,
       clusterId,
       bootstrapServers,
-      controllerListeners,
+      localListeners,
+      Features.KRAFT_VERSION.supportedVersionRange(),
       raftConfig
     )
   }
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 8a99cd9eed9..ae4980f4354 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -1095,6 +1095,6 @@ class ControllerApis(
 
   def handleUpdateRaftVoter(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-    throw new UnsupportedVersionException("handleUpdateRaftVoter is not 
supported yet.")
+    handleRaftRequest(request, response => new 
UpdateRaftVoterResponse(response.asInstanceOf[UpdateRaftVoterResponseData]))
   }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java 
b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
index 5e979022ec3..8dd05a5ae85 100644
--- a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
+++ b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.common.message.EndQuorumEpochRequestData;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.message.FetchSnapshotResponseData;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
 import org.apache.kafka.common.message.VoteResponseData;
 import org.apache.kafka.common.message.VotersRecord;
 import org.apache.kafka.common.network.ListenerName;
@@ -131,6 +132,21 @@ public final class Endpoints {
         return listeners;
     }
 
+    public UpdateRaftVoterRequestData.ListenerCollection 
toUpdateVoterRequest() {
+        UpdateRaftVoterRequestData.ListenerCollection listeners =
+            new 
UpdateRaftVoterRequestData.ListenerCollection(endpoints.size());
+        for (Map.Entry<ListenerName, InetSocketAddress> entry : 
endpoints.entrySet()) {
+            listeners.add(
+                new UpdateRaftVoterRequestData.Listener()
+                    .setName(entry.getKey().value())
+                    .setHost(entry.getValue().getHostString())
+                    .setPort(entry.getValue().getPort())
+            );
+        }
+
+        return listeners;
+    }
+
     private static final Endpoints EMPTY = new 
Endpoints(Collections.emptyMap());
     public static Endpoints empty() {
         return EMPTY;
@@ -272,4 +288,16 @@ public final class Endpoints {
 
         return new Endpoints(listeners);
     }
+
+    public static Endpoints 
fromUpdateVoterRequest(UpdateRaftVoterRequestData.ListenerCollection endpoints) 
{
+        Map<ListenerName, InetSocketAddress> listeners = new 
HashMap<>(endpoints.size());
+        for (UpdateRaftVoterRequestData.Listener endpoint : endpoints) {
+            listeners.put(
+                ListenerName.normalised(endpoint.name()),
+                InetSocketAddress.createUnresolved(endpoint.host(), 
endpoint.port())
+            );
+        }
+
+        return new Endpoints(listeners);
+    }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 0b454666db3..ed11e45b520 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.AddRaftVoterRequestData;
 import org.apache.kafka.common.message.AddRaftVoterResponseData;
@@ -39,6 +40,8 @@ import 
org.apache.kafka.common.message.FetchSnapshotRequestData;
 import org.apache.kafka.common.message.FetchSnapshotResponseData;
 import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
 import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
+import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
 import org.apache.kafka.common.message.VoteRequestData;
 import org.apache.kafka.common.message.VoteResponseData;
 import org.apache.kafka.common.metrics.Metrics;
@@ -76,6 +79,7 @@ import org.apache.kafka.raft.internals.RecordsBatchReader;
 import org.apache.kafka.raft.internals.RemoveVoterHandler;
 import org.apache.kafka.raft.internals.ReplicaKey;
 import org.apache.kafka.raft.internals.ThresholdPurgatory;
+import org.apache.kafka.raft.internals.UpdateVoterHandler;
 import org.apache.kafka.raft.internals.VoterSet;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.serialization.RecordSerde;
@@ -171,6 +175,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     private final int fetchMaxWaitMs;
     private final String clusterId;
     private final Endpoints localListeners;
+    private final SupportedVersionRange localSupportedKRaftVersion;
     private final NetworkChannel channel;
     private final ReplicatedLog log;
     private final Random random;
@@ -207,6 +212,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     // Specialized handlers
     private volatile AddVoterHandler addVoterHandler;
     private volatile RemoveVoterHandler removeVoterHandler;
+    private volatile UpdateVoterHandler updateVoterHandler;
 
     /**
      * Create a new instance.
@@ -226,6 +232,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         String clusterId,
         Collection<InetSocketAddress> bootstrapServers,
         Endpoints localListeners,
+        SupportedVersionRange localSupportedKRaftVersion,
         QuorumConfig quorumConfig
     ) {
         this(
@@ -242,6 +249,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             clusterId,
             bootstrapServers,
             localListeners,
+            localSupportedKRaftVersion,
             logContext,
             new Random(),
             quorumConfig
@@ -262,6 +270,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         String clusterId,
         Collection<InetSocketAddress> bootstrapServers,
         Endpoints localListeners,
+        SupportedVersionRange localSupportedKRaftVersion,
         LogContext logContext,
         Random random,
         QuorumConfig quorumConfig
@@ -279,6 +288,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         this.time = time;
         this.clusterId = clusterId;
         this.localListeners = localListeners;
+        this.localSupportedKRaftVersion = localSupportedKRaftVersion;
         this.fetchMaxWaitMs = fetchMaxWaitMs;
         this.logger = logContext.logger(KafkaRaftClient.class);
         this.random = random;
@@ -349,6 +359,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             // add or remove voter request that need to be completed
             addVoterHandler.highWatermarkUpdated(state);
             removeVoterHandler.highWatermarkUpdated(state);
+            updateVoterHandler.highWatermarkUpdated(state);
 
             // After updating the high watermark, we first clear the append
             // purgatory so that we have an opportunity to route the pending
@@ -492,6 +503,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             nodeDirectoryId,
             partitionState,
             localListeners,
+            localSupportedKRaftVersion,
             quorumConfig.electionTimeoutMs(),
             quorumConfig.fetchTimeoutMs(),
             quorumStateStore,
@@ -543,6 +555,15 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             quorumConfig.requestTimeoutMs(),
             logContext
         );
+
+        // Specialized update voter handler
+        this.updateVoterHandler = new UpdateVoterHandler(
+            nodeId,
+            partitionState,
+            channel.listenerName(),
+            time,
+            quorumConfig.requestTimeoutMs()
+        );
     }
 
     @Override
@@ -2048,10 +2069,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             );
         }
 
-        Optional<Errors> leaderValidation = 
validateLeaderOnlyRequest(quorum.epoch());
-        if (leaderValidation.isPresent()) {
+        Optional<Errors> leaderValidationError = 
validateLeaderOnlyRequest(quorum.epoch());
+        if (leaderValidationError.isPresent()) {
             return completedFuture(
-                new 
AddRaftVoterResponseData().setErrorCode(leaderValidation.get().code())
+                new 
AddRaftVoterResponseData().setErrorCode(leaderValidationError.get().code())
             );
         }
 
@@ -2071,7 +2092,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                     .setErrorCode(Errors.INVALID_REQUEST.code())
                     .setErrorMessage(
                         String.format(
-                            "Add voter request didn't include the default 
listener: %s",
+                            "Add voter request didn't include the endpoint 
(%s) for the default listener %s",
+                            newVoterEndpoints,
                             channel.listenerName()
                         )
                     )
@@ -2130,10 +2152,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             );
         }
 
-        Optional<Errors> leaderValidation = 
validateLeaderOnlyRequest(quorum.epoch());
-        if (leaderValidation.isPresent()) {
+        Optional<Errors> leaderValidationError = 
validateLeaderOnlyRequest(quorum.epoch());
+        if (leaderValidationError.isPresent()) {
             return completedFuture(
-                new 
RemoveRaftVoterResponseData().setErrorCode(leaderValidation.get().code())
+                new 
RemoveRaftVoterResponseData().setErrorCode(leaderValidationError.get().code())
             );
         }
 
@@ -2153,6 +2175,84 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
     }
 
+    private CompletableFuture<UpdateRaftVoterResponseData> 
handleUpdateVoterRequest(
+        RaftRequest.Inbound requestMetadata,
+        long currentTimeMs
+    ) {
+        UpdateRaftVoterRequestData data = (UpdateRaftVoterRequestData) 
requestMetadata.data();
+
+        if (!hasValidClusterId(data.clusterId())) {
+            return completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.INCONSISTENT_CLUSTER_ID,
+                    requestMetadata.listenerName(),
+                    quorum.leaderAndEpoch(),
+                    quorum.leaderEndpoints()
+                )
+            );
+        }
+
+        Optional<Errors> leaderValidationError = 
validateLeaderOnlyRequest(data.currentLeaderEpoch());
+        if (leaderValidationError.isPresent()) {
+            return completedFuture(
+                RaftUtil.updateVoterResponse(
+                    leaderValidationError.get(),
+                    requestMetadata.listenerName(),
+                    quorum.leaderAndEpoch(),
+                    quorum.leaderEndpoints()
+                )
+            );
+        }
+
+        Optional<ReplicaKey> voter = RaftUtil.updateVoterRequestVoterKey(data);
+        if (!voter.isPresent() || !voter.get().directoryId().isPresent()) {
+            return completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.INVALID_REQUEST,
+                    requestMetadata.listenerName(),
+                    quorum.leaderAndEpoch(),
+                    quorum.leaderEndpoints()
+                )
+            );
+        }
+
+        Endpoints voterEndpoints = 
Endpoints.fromUpdateVoterRequest(data.listeners());
+        if (!voterEndpoints.address(channel.listenerName()).isPresent()) {
+            return completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.INVALID_REQUEST,
+                    requestMetadata.listenerName(),
+                    quorum.leaderAndEpoch(),
+                    quorum.leaderEndpoints()
+                )
+            );
+        }
+
+        UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions 
= data.kRaftVersionFeature();
+        if (supportedKraftVersions.minSupportedVersion() < 0 ||
+            supportedKraftVersions.maxSupportedVersion() < 0 ||
+            supportedKraftVersions.maxSupportedVersion() < 
supportedKraftVersions.minSupportedVersion()
+        ) {
+            return completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.INVALID_REQUEST,
+                    requestMetadata.listenerName(),
+                    quorum.leaderAndEpoch(),
+                    quorum.leaderEndpoints()
+                )
+            );
+        }
+
+        return updateVoterHandler.handleUpdateVoterRequest(
+            quorum.leaderStateOrThrow(),
+            requestMetadata.listenerName(),
+            voter.get(),
+            voterEndpoints,
+            supportedKraftVersions,
+            currentTimeMs
+        );
+    }
+
     private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
         // Only elected leaders are sent in the request/response header, so if 
we have an elected
         // leaderId, it should be consistent with what is in the message.
@@ -2419,6 +2519,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 responseFuture = handleRemoveVoterRequest(request, 
currentTimeMs);
                 break;
 
+            case UPDATE_RAFT_VOTER:
+                responseFuture = handleUpdateVoterRequest(request, 
currentTimeMs);
+                break;
+
             default:
                 throw new IllegalArgumentException("Unexpected request type " 
+ apiKey);
         }
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 06deb28c69c..ecc0e68a0f8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.message.KRaftVersionRecord;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
@@ -31,6 +32,7 @@ import org.apache.kafka.raft.internals.AddVoterHandlerState;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.RemoveVoterHandlerState;
 import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.raft.internals.UpdateVoterHandlerState;
 import org.apache.kafka.raft.internals.VoterSet;
 import org.apache.kafka.server.common.KRaftVersion;
 
@@ -44,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -64,7 +67,8 @@ public class LeaderState<T> implements EpochState {
     private final int epoch;
     private final long epochStartOffset;
     private final Set<Integer> grantingVoters;
-    private final Endpoints endpoints;
+    private final Endpoints localListeners;
+    private final SupportedVersionRange localSupportedKRaftVersion;
     private final VoterSet voterSetAtEpochStart;
     // This field is non-empty if the voter set at epoch start came from a 
snapshot or log segment
     private final OptionalLong offsetOfVotersAtEpochStart;
@@ -74,6 +78,8 @@ public class LeaderState<T> implements EpochState {
     private Map<Integer, ReplicaState> voterStates = new HashMap<>();
     private Optional<AddVoterHandlerState> addVoterHandlerState = 
Optional.empty();
     private Optional<RemoveVoterHandlerState> removeVoterHandlerState = 
Optional.empty();
+    private Optional<UpdateVoterHandlerState> updateVoterHandlerState = 
Optional.empty();
+
 
     private final Map<ReplicaKey, ReplicaState> observerStates = new 
HashMap<>();
     private final Logger log;
@@ -98,14 +104,16 @@ public class LeaderState<T> implements EpochState {
         KRaftVersion kraftVersionAtEpochStart,
         Set<Integer> grantingVoters,
         BatchAccumulator<T> accumulator,
-        Endpoints endpoints,
+        Endpoints localListeners,
+        SupportedVersionRange localSupportedKRaftVersion,
         int fetchTimeoutMs,
         LogContext logContext
     ) {
         this.localReplicaKey = localReplicaKey;
         this.epoch = epoch;
         this.epochStartOffset = epochStartOffset;
-        this.endpoints = endpoints;
+        this.localListeners = localListeners;
+        this.localSupportedKRaftVersion = localSupportedKRaftVersion;
 
         for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) {
             boolean hasAcknowledgedLeader = voterNode.isVoter(localReplicaKey);
@@ -239,8 +247,26 @@ public class LeaderState<T> implements EpochState {
         removeVoterHandlerState = state;
     }
 
+    public Optional<UpdateVoterHandlerState> updateVoterHandlerState() {
+        return updateVoterHandlerState;
+    }
+
+    public void resetUpdateVoterHandlerState(
+        Errors error,
+        Optional<UpdateVoterHandlerState> state
+    ) {
+        updateVoterHandlerState.ifPresent(
+            handlerState -> handlerState.completeFuture(
+                error,
+                new LeaderAndEpoch(OptionalInt.of(localReplicaKey.id()), 
epoch),
+                localListeners
+            )
+        );
+        updateVoterHandlerState = state;
+    }
+
     public long maybeExpirePendingOperation(long currentTimeMs) {
-        // First abort any expired operation
+        // First abort any expired operations
         long timeUntilAddVoterExpiration = addVoterHandlerState()
             .map(state -> state.timeUntilOperationExpiration(currentTimeMs))
             .orElse(Long.MAX_VALUE);
@@ -257,14 +283,27 @@ public class LeaderState<T> implements EpochState {
             resetRemoveVoterHandlerState(Errors.REQUEST_TIMED_OUT, null, 
Optional.empty());
         }
 
-        // Reread the timeouts and return the smaller of the two
+        long timeUntilUpdateVoterExpiration = updateVoterHandlerState()
+            .map(state -> state.timeUntilOperationExpiration(currentTimeMs))
+            .orElse(Long.MAX_VALUE);
+
+        if (timeUntilUpdateVoterExpiration == 0) {
+            resetUpdateVoterHandlerState(Errors.REQUEST_TIMED_OUT, 
Optional.empty());
+        }
+
+        // Reread the timeouts and return the smaller of them
         return Math.min(
             addVoterHandlerState()
                 .map(state -> 
state.timeUntilOperationExpiration(currentTimeMs))
                 .orElse(Long.MAX_VALUE),
-            removeVoterHandlerState()
-                .map(state -> 
state.timeUntilOperationExpiration(currentTimeMs))
-                .orElse(Long.MAX_VALUE)
+            Math.min(
+                removeVoterHandlerState()
+                    .map(state -> 
state.timeUntilOperationExpiration(currentTimeMs))
+                    .orElse(Long.MAX_VALUE),
+                updateVoterHandlerState()
+                    .map(state -> 
state.timeUntilOperationExpiration(currentTimeMs))
+                    .orElse(Long.MAX_VALUE)
+            )
         );
     }
 
@@ -308,36 +347,54 @@ public class LeaderState<T> implements EpochState {
             ) {
                 builder.appendLeaderChangeMessage(currentTimeMs, 
leaderChangeMessage);
 
-                offsetOfVotersAtEpochStart.ifPresent(offset -> {
-                    if (offset == -1) {
-                        // Latest voter set came from the bootstrap checkpoint 
(0-0.checkpoint)
-                        // rewrite the voter set to the log so that it is 
replicated to the replicas.
-                        if (!kraftVersionAtEpochStart.isReconfigSupported()) {
-                            throw new IllegalStateException(
-                                String.format(
-                                    "The bootstrap checkpoint contains a set 
of voters %s at %s " +
-                                    "and the KRaft version is %s",
-                                    voterSetAtEpochStart,
-                                    offset,
-                                    kraftVersionAtEpochStart
-                                )
-                            );
-                        } else {
-                            builder.appendKRaftVersionMessage(
-                                currentTimeMs,
-                                new KRaftVersionRecord()
-                                    
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
-                                    
.setKRaftVersion(kraftVersionAtEpochStart.featureLevel())
-                            );
-                            builder.appendVotersMessage(
-                                currentTimeMs,
-                                voterSetAtEpochStart.toVotersRecord(
-                                    
ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION
+                if (kraftVersionAtEpochStart.isReconfigSupported()) {
+                    long offset = offsetOfVotersAtEpochStart.orElseThrow(
+                        () -> new IllegalStateException(
+                            String.format(
+                                "The %s is %s but there is no voter set in the 
log or " +
+                                "checkpoint %s",
+                                KRaftVersion.FEATURE_NAME,
+                                kraftVersionAtEpochStart,
+                                voterSetAtEpochStart
+                            )
+                        )
+                    );
+
+                    VoterSet.VoterNode updatedVoterNode = 
VoterSet.VoterNode.of(
+                        localReplicaKey,
+                        localListeners,
+                        localSupportedKRaftVersion
+                    );
+
+                    // The leader should write the latest voters record if its 
local listeners are different
+                    // or it has never written a voters record to the log 
before.
+                    if (offset == -1 || 
voterSetAtEpochStart.voterNodeNeedsUpdate(updatedVoterNode)) {
+                        VoterSet updatedVoterSet = voterSetAtEpochStart
+                            .updateVoter(updatedVoterNode)
+                            .orElseThrow(
+                                () -> new IllegalStateException(
+                                    String.format(
+                                        "Update expected for leader node %s 
and voter set %s",
+                                        updatedVoterNode,
+                                        voterSetAtEpochStart
+                                    )
                                 )
                             );
-                        }
+
+                        builder.appendKRaftVersionMessage(
+                            currentTimeMs,
+                            new KRaftVersionRecord()
+                                
.setVersion(kraftVersionAtEpochStart.kraftVersionRecordVersion())
+                                
.setKRaftVersion(kraftVersionAtEpochStart.featureLevel())
+                        );
+                        builder.appendVotersMessage(
+                            currentTimeMs,
+                            updatedVoterSet.toVotersRecord(
+                                kraftVersionAtEpochStart.votersRecordVersion()
+                            )
+                        );
                     }
-                });
+                }
 
                 return builder.build();
             }
@@ -390,7 +447,7 @@ public class LeaderState<T> implements EpochState {
 
     @Override
     public Endpoints leaderEndpoints() {
-        return endpoints;
+        return localListeners;
     }
 
     Map<Integer, ReplicaState> voterStates() {
@@ -826,11 +883,9 @@ public class LeaderState<T> implements EpochState {
 
     @Override
     public void close() {
-        addVoterHandlerState.ifPresent(AddVoterHandlerState::close);
-        addVoterHandlerState = Optional.empty();
-
-        removeVoterHandlerState.ifPresent(RemoveVoterHandlerState::close);
-        removeVoterHandlerState = Optional.empty();
+        resetAddVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, 
Optional.empty());
+        resetRemoveVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, 
Optional.empty());
+        resetUpdateVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, 
Optional.empty());
 
         accumulator.close();
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index d84e274990e..913a7f474d5 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.raft;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.raft.internals.BatchAccumulator;
@@ -84,6 +85,7 @@ public class QuorumState {
     private final QuorumStateStore store;
     private final KRaftControlRecordStateMachine partitionState;
     private final Endpoints localListeners;
+    private final SupportedVersionRange localSupportedKRaftVersion;
     private final Random random;
     private final int electionTimeoutMs;
     private final int fetchTimeoutMs;
@@ -96,6 +98,7 @@ public class QuorumState {
         Uuid localDirectoryId,
         KRaftControlRecordStateMachine partitionState,
         Endpoints localListeners,
+        SupportedVersionRange localSupportedKRaftVersion,
         int electionTimeoutMs,
         int fetchTimeoutMs,
         QuorumStateStore store,
@@ -107,6 +110,7 @@ public class QuorumState {
         this.localDirectoryId = localDirectoryId;
         this.partitionState = partitionState;
         this.localListeners = localListeners;
+        this.localSupportedKRaftVersion = localSupportedKRaftVersion;
         this.electionTimeoutMs = electionTimeoutMs;
         this.fetchTimeoutMs = fetchTimeoutMs;
         this.store = store;
@@ -550,6 +554,7 @@ public class QuorumState {
             candidateState.grantingVoters(),
             accumulator,
             localListeners,
+            localSupportedKRaftVersion,
             fetchTimeoutMs,
             logContext
         );
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index 4d2f3bc06e8..224da441a9c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -18,6 +18,7 @@ package org.apache.kafka.raft;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.message.AddRaftVoterRequestData;
 import org.apache.kafka.common.message.AddRaftVoterResponseData;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
@@ -32,6 +33,8 @@ import 
org.apache.kafka.common.message.FetchSnapshotRequestData;
 import org.apache.kafka.common.message.FetchSnapshotResponseData;
 import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
 import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
+import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
 import org.apache.kafka.common.message.VoteRequestData;
 import org.apache.kafka.common.message.VoteResponseData;
 import org.apache.kafka.common.network.ListenerName;
@@ -542,7 +545,7 @@ public class RaftUtil {
             .setErrorCode(error.code())
             .setErrorMessage(errorMessage);
     }
-    
+
     public static RemoveRaftVoterRequestData removeVoterRequest(
         String clusterId,
         ReplicaKey voter
@@ -564,6 +567,50 @@ public class RaftUtil {
             .setErrorMessage(errorMessage);
     }
 
+    public static UpdateRaftVoterRequestData updateVoterRequest(
+        String clusterId,
+        ReplicaKey voter,
+        int epoch,
+        SupportedVersionRange supportedVersions,
+        Endpoints endpoints
+    ) {
+        UpdateRaftVoterRequestData request = new UpdateRaftVoterRequestData()
+            .setClusterId(clusterId)
+            .setCurrentLeaderEpoch(epoch)
+            .setVoterId(voter.id())
+            
.setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID))
+            .setListeners(endpoints.toUpdateVoterRequest());
+
+        request.kRaftVersionFeature()
+            .setMinSupportedVersion(supportedVersions.min())
+            .setMaxSupportedVersion(supportedVersions.max());
+
+        return request;
+    }
+
+    public static UpdateRaftVoterResponseData updateVoterResponse(
+        Errors error,
+        ListenerName listenerName,
+        LeaderAndEpoch leaderAndEpoch,
+        Endpoints endpoints
+    ) {
+        UpdateRaftVoterResponseData response = new 
UpdateRaftVoterResponseData()
+            .setErrorCode(error.code());
+
+        response.currentLeader()
+            .setLeaderId(leaderAndEpoch.leaderId().orElse(-1))
+            .setLeaderEpoch(leaderAndEpoch.epoch());
+
+        Optional<InetSocketAddress> address = endpoints.address(listenerName);
+        if (address.isPresent()) {
+            response.currentLeader()
+                .setHost(address.get().getHostString())
+                .setPort(address.get().getPort());
+        }
+
+        return response;
+    }
+
     private static List<DescribeQuorumResponseData.ReplicaState> 
toReplicaStates(
         short apiVersion,
         int leaderId,
@@ -641,6 +688,14 @@ public class RaftUtil {
         }
     }
 
+    public static Optional<ReplicaKey> 
updateVoterRequestVoterKey(UpdateRaftVoterRequestData request) {
+        if (request.voterId() < 0) {
+            return Optional.empty();
+        } else {
+            return Optional.of(ReplicaKey.of(request.voterId(), 
request.voterDirectoryId()));
+        }
+    }
+
     static boolean hasValidTopicPartition(FetchRequestData data, 
TopicPartition topicPartition, Uuid topicId) {
         return data.topics().size() == 1 &&
             data.topics().get(0).topicId().equals(topicId) &&
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
index a504e2ec9d4..444e747b1e5 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
@@ -87,12 +87,12 @@ public final class AddVoterHandler {
         Endpoints voterEndpoints,
         long currentTimeMs
     ) {
-        // Check if there are any pending add or remove voter requests
+        // Check if there are any pending voter change requests
         if (leaderState.isOperationPending(currentTimeMs)) {
             return CompletableFuture.completedFuture(
                 RaftUtil.addVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
-                    "Request timed out waiting for leader to handle previous 
add or remove voter request"
+                    "Request timed out waiting for leader to handle previous 
voter change request"
                 )
             );
         }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
index 278fd4ecd62..5bef0afa709 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
@@ -18,15 +18,13 @@
 package org.apache.kafka.raft.internals;
 
 import org.apache.kafka.common.message.AddRaftVoterResponseData;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.raft.Endpoints;
-import org.apache.kafka.raft.RaftUtil;
 
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 
-public final class AddVoterHandlerState implements AutoCloseable {
+public final class AddVoterHandlerState {
     private final ReplicaKey voterKey;
     private final Endpoints voterEndpoints;
     private final Timer timeout;
@@ -84,9 +82,4 @@ public final class AddVoterHandlerState implements 
AutoCloseable {
     public CompletableFuture<AddRaftVoterResponseData> future() {
         return future;
     }
-
-    @Override
-    public void close() {
-        
future.complete(RaftUtil.addVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, null));
-    }
 }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
index b7f4ffb759e..3a62d383dda 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
@@ -42,7 +42,7 @@ import java.util.concurrent.CompletableFuture;
  * 2. Check that the cluster supports kraft.version 1, otherwise return the 
UNSUPPORTED_VERSION error.
  * 3. Check that there are no uncommitted voter changes, otherwise return the 
REQUEST_TIMED_OUT error.
  * 4. Append the updated VotersRecord to the log. The KRaft internal listener 
will read this
- *    uncommitted record from the log and add the new voter to the set of 
voters.
+ *    uncommitted record from the log and remove the voter from the set of 
voters.
  * 5. Wait for the VotersRecord to commit using the majority of the new set of 
voters. Return a
  *    REQUEST_TIMED_OUT error if it doesn't commit in time.
  * 6. Send the RemoveVoter successful response to the client.
@@ -77,12 +77,12 @@ public final class RemoveVoterHandler {
         ReplicaKey voterKey,
         long currentTimeMs
     ) {
-        // Check if there are any pending add or remove voter requests
+        // Check if there are any pending voter change requests
         if (leaderState.isOperationPending(currentTimeMs)) {
             return CompletableFuture.completedFuture(
                 RaftUtil.removeVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
-                    "Request timed out waiting for leader to handle previous 
add or remove voter request"
+                    "Request timed out waiting for leader to handle previous 
voter change request"
                 )
             );
         }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java
 
b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java
index 670103b4346..bb9ef4cb2cc 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java
@@ -17,13 +17,11 @@
 package org.apache.kafka.raft.internals;
 
 import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.raft.RaftUtil;
 
 import java.util.concurrent.CompletableFuture;
 
-public final class RemoveVoterHandlerState implements AutoCloseable {
+public final class RemoveVoterHandlerState {
     private final long lastOffset;
     private final Timer timeout;
     private final CompletableFuture<RemoveRaftVoterResponseData> future = new 
CompletableFuture<>();
@@ -45,9 +43,4 @@ public final class RemoveVoterHandlerState implements 
AutoCloseable {
     public long lastOffset() {
         return lastOffset;
     }
-
-    @Override
-    public void close() {
-        
future.complete(RaftUtil.removeVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, 
null));
-    }
 }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
new file mode 100644
index 00000000000..152de68f3fc
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
+import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.Endpoints;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.LeaderState;
+import org.apache.kafka.raft.LogOffsetMetadata;
+import org.apache.kafka.raft.RaftUtil;
+import org.apache.kafka.server.common.KRaftVersion;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This type implements the protocol for updating a voter from a KRaft 
partition.
+ *
+ * 1. Check that the leader has fenced the previous leader(s) by checking that 
the HWM is known,
+ *    otherwise return the REQUEST_TIMED_OUT error.
+ * 2. Check that the cluster supports kraft.version 1, otherwise return the 
UNSUPPORTED_VERSION error.
+ * 3. Check that there are no uncommitted voter changes, otherwise return the 
REQUEST_TIMED_OUT error.
+ * 4. Check that the updated voter still supports the currently finalized 
kraft.version, otherwise
+ *    return the INVALID_REQUEST error.
+ * 5. Check that the updated voter is still listening on the default listener.
+ * 6. Append the updated VotersRecord to the log. The KRaft internal listener 
will read this
+ *    uncommitted record from the log and update the voter in the set of 
voters.
+ * 7. Wait for the VotersRecord to commit using the majority of the voters. 
Return a
+ *    REQUEST_TIMED_OUT error if it doesn't commit in time.
+ * 8. Send the UpdateVoter successful response to the voter.
+ *
+ * KAFKA-16538 is going to add support for handling this RPC when the 
kraft.version is 0.
+ */
+public final class UpdateVoterHandler {
+    private final OptionalInt localId;
+    private final KRaftControlRecordStateMachine partitionState;
+    private final ListenerName defaultListenerName;
+    private final Time time;
+    private final long requestTimeoutMs;
+
+    public UpdateVoterHandler(
+        OptionalInt localId,
+        KRaftControlRecordStateMachine partitionState,
+        ListenerName defaultListenerName,
+        Time time,
+        long requestTimeoutMs
+    ) {
+        this.localId = localId;
+        this.partitionState = partitionState;
+        this.defaultListenerName = defaultListenerName;
+        this.time = time;
+        this.requestTimeoutMs = requestTimeoutMs;
+    }
+
+    public CompletableFuture<UpdateRaftVoterResponseData> 
handleUpdateVoterRequest(
+        LeaderState<?> leaderState,
+        ListenerName requestListenerName,
+        ReplicaKey voterKey,
+        Endpoints voterEndpoints,
+        UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions,
+        long currentTimeMs
+    ) {
+        // Check if there are any pending voter change requests
+        if (leaderState.isOperationPending(currentTimeMs)) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.REQUEST_TIMED_OUT,
+                    requestListenerName,
+                    new LeaderAndEpoch(
+                        localId,
+                        leaderState.epoch()
+                    ),
+                    leaderState.leaderEndpoints()
+                )
+            );
+        }
+
+        // Check that the leader has established a HWM and committed the 
current epoch
+        Optional<Long> highWatermark = 
leaderState.highWatermark().map(LogOffsetMetadata::offset);
+        if (!highWatermark.isPresent()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.REQUEST_TIMED_OUT,
+                    requestListenerName,
+                    new LeaderAndEpoch(
+                        localId,
+                        leaderState.epoch()
+                    ),
+                    leaderState.leaderEndpoints()
+                )
+            );
+        }
+
+        // KAFKA-16538 will implement the case when the kraft.version is 0
+        // Check that the cluster supports kraft.version >= 1
+        KRaftVersion kraftVersion = partitionState.lastKraftVersion();
+        if (!kraftVersion.isReconfigSupported()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.UNSUPPORTED_VERSION,
+                    requestListenerName,
+                    new LeaderAndEpoch(
+                        localId,
+                        leaderState.epoch()
+                    ),
+                    leaderState.leaderEndpoints()
+                )
+            );
+        }
+
+        // Check that there are no uncommitted VotersRecord
+        Optional<LogHistory.Entry<VoterSet>> votersEntry = 
partitionState.lastVoterSetEntry();
+        if (!votersEntry.isPresent() || votersEntry.get().offset() >= 
highWatermark.get()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.REQUEST_TIMED_OUT,
+                    requestListenerName,
+                    new LeaderAndEpoch(
+                        localId,
+                        leaderState.epoch()
+                    ),
+                    leaderState.leaderEndpoints()
+                )
+            );
+        }
+
+        // Check that the supported version range is valid
+        if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.INVALID_REQUEST,
+                    requestListenerName,
+                    new LeaderAndEpoch(
+                        localId,
+                        leaderState.epoch()
+                    ),
+                    leaderState.leaderEndpoints()
+                )
+            );
+        }
+
+        // Check that endpoinds includes the default listener
+        if (!voterEndpoints.address(defaultListenerName).isPresent()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.INVALID_REQUEST,
+                    requestListenerName,
+                    new LeaderAndEpoch(
+                        localId,
+                        leaderState.epoch()
+                    ),
+                    leaderState.leaderEndpoints()
+                )
+            );
+        }
+
+        // Update the voter
+        Optional<VoterSet> updatedVoters = votersEntry
+            .get()
+            .value()
+            .updateVoter(
+                VoterSet.VoterNode.of(
+                    voterKey,
+                    voterEndpoints,
+                    new SupportedVersionRange(
+                        supportedKraftVersions.minSupportedVersion(),
+                        supportedKraftVersions.maxSupportedVersion()
+                    )
+                )
+            );
+        if (!updatedVoters.isPresent()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.updateVoterResponse(
+                    Errors.VOTER_NOT_FOUND,
+                    requestListenerName,
+                    new LeaderAndEpoch(
+                        localId,
+                        leaderState.epoch()
+                    ),
+                    leaderState.leaderEndpoints()
+                )
+            );
+        }
+
+        UpdateVoterHandlerState state = new UpdateVoterHandlerState(
+            leaderState.appendVotersRecord(updatedVoters.get(), currentTimeMs),
+            requestListenerName,
+            time.timer(requestTimeoutMs)
+        );
+        leaderState.resetUpdateVoterHandlerState(Errors.UNKNOWN_SERVER_ERROR, 
Optional.of(state));
+
+        return state.future();
+    }
+
+    public void highWatermarkUpdated(LeaderState<?> leaderState) {
+        leaderState.updateVoterHandlerState().ifPresent(current -> {
+            leaderState.highWatermark().ifPresent(highWatermark -> {
+                if (highWatermark.offset() > current.lastOffset()) {
+                    // VotersRecord with the updated voter was committed; 
complete the RPC
+                    leaderState.resetUpdateVoterHandlerState(Errors.NONE, 
Optional.empty());
+                }
+            });
+        });
+    }
+
+    private boolean validVersionRange(
+        KRaftVersion finalizedVersion,
+        UpdateRaftVoterRequestData.KRaftVersionFeature supportedKraftVersions
+    ) {
+        return supportedKraftVersions.minSupportedVersion() <= 
finalizedVersion.featureLevel() &&
+            supportedKraftVersions.maxSupportedVersion() >= 
finalizedVersion.featureLevel();
+    }
+}
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java
 
b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java
similarity index 60%
copy from 
raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java
copy to 
raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java
index 670103b4346..c0ac6c51899 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandlerState.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandlerState.java
@@ -16,20 +16,29 @@
  */
 package org.apache.kafka.raft.internals;
 
-import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
+import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.Endpoints;
+import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.RaftUtil;
 
 import java.util.concurrent.CompletableFuture;
 
-public final class RemoveVoterHandlerState implements AutoCloseable {
+public final class UpdateVoterHandlerState {
     private final long lastOffset;
+    private final ListenerName requestListenerName;
     private final Timer timeout;
-    private final CompletableFuture<RemoveRaftVoterResponseData> future = new 
CompletableFuture<>();
+    private final CompletableFuture<UpdateRaftVoterResponseData> future = new 
CompletableFuture<>();
 
-    RemoveVoterHandlerState(long lastOffset, Timer timeout) {
+    UpdateVoterHandlerState(
+        long lastOffset,
+        ListenerName requestListenerName,
+        Timer timeout
+    ) {
         this.lastOffset = lastOffset;
+        this.requestListenerName = requestListenerName;
         this.timeout = timeout;
     }
 
@@ -38,16 +47,26 @@ public final class RemoveVoterHandlerState implements 
AutoCloseable {
         return timeout.remainingMs();
     }
 
-    public CompletableFuture<RemoveRaftVoterResponseData> future() {
+    public CompletableFuture<UpdateRaftVoterResponseData> future() {
         return future;
     }
 
-    public long lastOffset() {
-        return lastOffset;
+    public void completeFuture(
+        Errors error,
+        LeaderAndEpoch leaderAndEpoch,
+        Endpoints leaderEndpoints
+    ) {
+        future.complete(
+            RaftUtil.updateVoterResponse(
+                error,
+                requestListenerName,
+                leaderAndEpoch,
+                leaderEndpoints
+            )
+        );
     }
 
-    @Override
-    public void close() {
-        
future.complete(RaftUtil.removeVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, 
null));
+    public long lastOffset() {
+        return lastOffset;
     }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index d583219c380..41f9b93b7d9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -95,6 +95,21 @@ public final class VoterSet {
             .map(address -> new Node(voterId, address.getHostString(), 
address.getPort()));
     }
 
+    /**
+     * Return true if the provided voter node is a voter and would cause a 
change in the voter set.
+     *
+     * @param updatedVoterNode the updated voter node
+     * @return true if the updated voter node is different than the node in 
the voter set; otherwise false.
+     */
+    public boolean voterNodeNeedsUpdate(VoterNode updatedVoterNode) {
+        return 
Optional.ofNullable(voters.get(updatedVoterNode.voterKey().id()))
+            .map(
+                node -> node.isVoter(updatedVoterNode.voterKey()) &&
+                        !node.equals(updatedVoterNode)
+            )
+            .orElse(false);
+    }
+
     /**
      * Returns if the node is a voter in the set of voters.
      *
@@ -209,6 +224,24 @@ public final class VoterSet {
         return Optional.empty();
     }
 
+    /**
+     * Updates a voter in the voter set.
+     *
+     * @param voter the updated voter
+     * @return a new voter set if the voter was updated, otherwise {@code 
Optional.empty()}
+     */
+    public Optional<VoterSet> updateVoter(VoterNode voter) {
+        VoterNode oldVoter = voters.get(voter.voterKey().id());
+        if (oldVoter != null && oldVoter.isVoter(voter.voterKey())) {
+            HashMap<Integer, VoterNode> newVoters = new HashMap<>(voters);
+            newVoters.put(voter.voterKey().id(), voter);
+
+            return Optional.of(new VoterSet(newVoters));
+        }
+
+        return Optional.empty();
+    }
+
     /**
      * Converts a voter set to a voters record for a given version.
      *
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 2ca0e457208..1b34ba0433c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.raft.internals.ReplicaKey;
 import org.apache.kafka.raft.internals.VoterSet;
 import org.apache.kafka.raft.internals.VoterSetTest;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.snapshot.RecordsSnapshotReader;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -50,6 +51,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -716,9 +718,6 @@ public class KafkaRaftClientReconfigTest {
             Collections.singletonMap(context.channel.listenerName(), 
newAddress)
         );
 
-        // Show that the new voter is not currently a voter
-        assertFalse(context.client.quorum().isVoter(newVoter));
-
         // Establish a HWM and fence previous leaders
         context.deliverRequest(
             context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
@@ -1092,8 +1091,6 @@ public class KafkaRaftClientReconfigTest {
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
-        assertTrue(context.client.quorum().isVoter(follower2));
-
         // Establish a HWM and fence previous leaders
         context.deliverRequest(
             context.fetchRequest(epoch, follower1, 
context.log.endOffset().offset(), epoch, 0)
@@ -1185,7 +1182,7 @@ public class KafkaRaftClientReconfigTest {
             .withUnknownLeader(3)
             .build();
 
-        // Attempt to add new voter to the quorum
+        // Attempt to remove voter to the quorum
         context.deliverRequest(context.removeVoterRequest(follower1));
         context.pollUntilResponse();
         context.assertSentRemoveVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER);
@@ -1380,8 +1377,6 @@ public class KafkaRaftClientReconfigTest {
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
-        assertTrue(context.client.quorum().isVoter(follower2));
-
         // Establish a HWM and fence previous leaders
         context.deliverRequest(
             context.fetchRequest(epoch, follower1, 
context.log.endOffset().offset(), epoch, 0)
@@ -1426,8 +1421,6 @@ public class KafkaRaftClientReconfigTest {
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
-        assertTrue(context.client.quorum().isVoter(follower2));
-
         // Establish a HWM and fence previous leaders
         context.deliverRequest(
             context.fetchRequest(epoch, follower1, 
context.log.endOffset().offset(), epoch, 0)
@@ -1541,7 +1534,773 @@ public class KafkaRaftClientReconfigTest {
         // Attempt to add new voter to the quorum
         context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
 
-        // Attempt to remove followe while AddVoter is pending
+        // Attempt to remove follower while AddVoter is pending
+        context.deliverRequest(context.removeVoterRequest(follower));
+        context.pollUntilResponse();
+        context.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT);
+    }
+
+    @Test
+    void testUpdateVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        assertTrue(context.client.quorum().isVoter(follower));
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to update the follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+
+        // Handle the update voter request
+        context.client.poll();
+        // Append the VotersRecord to the log
+        context.client.poll();
+
+        // follower should not be a voter in the latest voter set
+        assertTrue(context.client.quorum().isVoter(follower));
+
+        // Send a FETCH to increase the HWM and commit the new voter set
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Expect reply for UpdateVoter request
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.NONE,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testLeaderUpdatesVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(
+            VoterSetTest.DEFAULT_LISTENER_NAME,
+            InetSocketAddress.createUnresolved("localhost", 9990 + local.id())
+        );
+        listenersMap.put(
+            ListenerName.normalised("ANOTHER_LISTENER"),
+            InetSocketAddress.createUnresolved("localhost", 8990 + local.id())
+        );
+        Endpoints localListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .withLocalListeners(localListeners)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        assertTrue(context.client.quorum().isVoter(follower));
+
+        // Establish a HWM and commit the latest voter set
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        Optional<VoterSet> updatedVoterSet = voters.updateVoter(
+            VoterSet.VoterNode.of(
+                local,
+                localListeners,
+                Features.KRAFT_VERSION.supportedVersionRange()
+            )
+        );
+        assertEquals(updatedVoterSet, 
context.listener.lastCommittedVoterSet());
+    }
+
+    @Test
+    public void testUpdateVoterInvalidClusterId() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // empty cluster id is rejected
+        context.deliverRequest(
+            context.updateVoterRequest(
+                "",
+                follower,
+                epoch,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                Endpoints.empty()
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.INCONSISTENT_CLUSTER_ID,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+
+        // invalid cluster id is rejected
+        context.deliverRequest(
+            context.updateVoterRequest(
+                "invalid-uuid",
+                follower,
+                epoch,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                Endpoints.empty()
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.INCONSISTENT_CLUSTER_ID,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterOldEpoch() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        context.deliverRequest(
+            context.updateVoterRequest(
+                context.clusterId,
+                follower,
+                epoch - 1,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                Endpoints.empty()
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.FENCED_LEADER_EPOCH,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterNewEpoch() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        context.deliverRequest(
+            context.updateVoterRequest(
+                context.clusterId,
+                follower,
+                epoch + 1,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                Endpoints.empty()
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.UNKNOWN_LEADER_EPOCH,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterToNotLeader() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        // Attempt to uodate voter in the quorum
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                Endpoints.empty()
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.NOT_LEADER_OR_FOLLOWER,
+            OptionalInt.empty(),
+            context.currentEpoch()
+        );
+    }
+
+    @Test
+    void testUpdateVoterWithPendingUpdateVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to update the follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+
+        // Handle the update voter request
+        context.client.poll();
+        // Append the VotersRecord to the log
+        context.client.poll();
+
+        // Attempt to update voter again
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.REQUEST_TIMED_OUT,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterWithoutFencedPreviousLeaders() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Attempt to update the follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.REQUEST_TIMED_OUT,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    // KAFKA-16538 is going to allow UpdateVoter RPC when the kraft.version is 0
+    @Test
+    void testUpdateVoterWithKraftVersion0() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withStaticVoters(voters)
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to update the follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.UNSUPPORTED_VERSION,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterWithNoneVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to update a replica with the same id as follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                replicaKey(follower.id(), true),
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.VOTER_NOT_FOUND,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterWithNoneVoterId() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to update a replica with the same id as follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id() + 1
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id() + 1
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                ReplicaKey.of(follower.id() + 1, follower.directoryId().get()),
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.VOTER_NOT_FOUND,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterTimedOut() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to update the follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+
+        // Handle the update voter request
+        context.client.poll();
+        // Append the VotersRecord to the log
+        context.client.poll();
+
+        // Wait for request timeout without sending a FETCH request to timeout 
the update voter RPC
+        context.time.sleep(context.requestTimeoutMs());
+
+        // Expect a timeout error
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.REQUEST_TIMED_OUT,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterFailsWhenLosingLeadership() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to update the follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+
+        // Handle the update voter request
+        context.client.poll();
+        // Append the VotersRecord to the log
+        context.client.poll();
+
+        // Leader completes the UpdateVoter RPC when resigning
+        context.client.resign(epoch);
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.NOT_LEADER_OR_FOLLOWER,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testUpdateVoterWithPendingAddVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newVoterAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newVoterListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newVoterAddress)
+        );
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newVoterListeners));
+
+        // Attempt to update the follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentUpdateVoterResponse(
+            Errors.REQUEST_TIMED_OUT,
+            OptionalInt.of(local.id()),
+            epoch
+        );
+    }
+
+    @Test
+    void testRemoveVoterWithPendingUpdateVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to update the follower
+        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + follower.id()
+        );
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            8990 + follower.id()
+        );
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(context.channel.listenerName(), defaultAddress);
+        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
+        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
+        context.deliverRequest(
+            context.updateVoterRequest(
+                follower,
+                Features.KRAFT_VERSION.supportedVersionRange(),
+                newListeners
+            )
+        );
+
+        // Attempt to remove follower while UpdateVoter is pending
         context.deliverRequest(context.removeVoterRequest(follower));
         context.pollUntilResponse();
         context.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT);
@@ -1575,7 +2334,7 @@ public class KafkaRaftClientReconfigTest {
     }
 
     private static ApiVersionsResponseData apiVersionsResponse(Errors error) {
-        return apiVersionsResponse(error, new SupportedVersionRange((short) 0, 
(short) 1));
+        return apiVersionsResponse(error, 
Features.KRAFT_VERSION.supportedVersionRange());
     }
 
     private static ApiVersionsResponseData apiVersionsResponse(Errors error, 
SupportedVersionRange supportedVersions) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 419618c00ff..e74e32fb79a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2730,7 +2730,12 @@ public class KafkaRaftClientTest {
         context.collectEndQuorumRequests(
             epoch,
             Utils.mkSet(closeFollower.id(), laggingFollower.id()),
-            Optional.of(Arrays.asList(closeFollower.id(), 
laggingFollower.id()))
+            Optional.of(
+                Arrays.asList(
+                    replicaKey(closeFollower.id(), false),
+                    replicaKey(laggingFollower.id(), false)
+                )
+            )
         );
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 8ce18ec9d44..04ff1ed29c2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.ReplicaKey;
 import org.apache.kafka.raft.internals.VoterSet;
 import org.apache.kafka.raft.internals.VoterSetTest;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.KRaftVersion;
 
 import org.junit.jupiter.api.Test;
@@ -75,6 +76,7 @@ public class LeaderStateTest {
             voters.voterIds(),
             accumulator,
             voters.listeners(localReplicaKey.id()),
+            Features.KRAFT_VERSION.supportedVersionRange(),
             fetchTimeoutMs,
             logContext
         );
@@ -120,6 +122,7 @@ public class LeaderStateTest {
                 Collections.emptySet(),
                 null,
                 Endpoints.empty(),
+                Features.KRAFT_VERSION.supportedVersionRange(),
                 fetchTimeoutMs,
                 logContext
             )
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
index 236b858a046..77f7234d337 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
 import org.apache.kafka.raft.internals.ReplicaKey;
 import org.apache.kafka.raft.internals.VoterSet;
 import org.apache.kafka.raft.internals.VoterSetTest;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.KRaftVersion;
 
 import org.junit.jupiter.params.ParameterizedTest;
@@ -80,6 +81,7 @@ public class QuorumStateTest {
             localDirectoryId,
             mockPartitionState,
             localId.isPresent() ? voterSet.listeners(localId.getAsInt()) : 
Endpoints.empty(),
+            Features.KRAFT_VERSION.supportedVersionRange(),
             electionTimeoutMs,
             fetchTimeoutMs,
             store,
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 695af19f7b1..d7097d6b7f5 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.AddRaftVoterRequestData;
 import org.apache.kafka.common.message.AddRaftVoterResponseData;
@@ -39,8 +40,11 @@ import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
 import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
 import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
+import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
+import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
 import org.apache.kafka.common.message.VoteRequestData;
 import org.apache.kafka.common.message.VoteResponseData;
+import org.apache.kafka.common.message.VotersRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ApiMessage;
@@ -62,6 +66,7 @@ import org.apache.kafka.raft.internals.BatchBuilder;
 import org.apache.kafka.raft.internals.ReplicaKey;
 import org.apache.kafka.raft.internals.StringSerde;
 import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.serialization.RecordSerde;
 import org.apache.kafka.snapshot.RecordsSnapshotWriter;
@@ -92,6 +97,7 @@ import java.util.function.Function;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.raft.LeaderState.CHECK_QUORUM_TIMEOUT_FACTOR;
 import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
@@ -167,6 +173,7 @@ public final class RaftClientTestContext {
         private List<InetSocketAddress> bootstrapServers = 
Collections.emptyList();
         private boolean kip853Rpc = false;
         private Optional<VoterSet> startingVoters = Optional.empty();
+        private Endpoints localListeners = Endpoints.empty();
         private boolean isStartingVotersStatic = false;
 
         public Builder(int localId, Set<Integer> staticVoters) {
@@ -346,6 +353,11 @@ public final class RaftClientTestContext {
             return this;
         }
 
+        Builder withLocalListeners(Endpoints localListeners) {
+            this.localListeners = localListeners;
+            return this;
+        }
+
         public RaftClientTestContext build() throws IOException {
             VoterSet startingVoters = 
this.startingVoters.orElseThrow(Builder::missingStartingVoterException);
 
@@ -365,11 +377,16 @@ public final class RaftClientTestContext {
                     );
             }
 
-            // Compute the local listeners. Only potential voters/leader need 
to provide the local listeners
-            // If the local id is not set (must be observer), the local 
listener can be empty.
-            Endpoints localListeners = localId.isPresent() ?
-                startingVoters.listeners(localId.getAsInt()) :
-                Endpoints.empty();
+            /*
+             * Compute the local listeners if the test didn't override it.
+             * Only potential voters/leader need to provide the local 
listeners.
+             * If the local id is not set (must be observer), the local 
listener can be empty.
+             */
+            Endpoints localListeners = this.localListeners.isEmpty() ?
+                localId.isPresent() ?
+                    startingVoters.listeners(localId.getAsInt()) :
+                    Endpoints.empty() :
+                this.localListeners;
 
             QuorumConfig quorumConfig = new QuorumConfig(
                 requestTimeoutMs,
@@ -394,6 +411,7 @@ public final class RaftClientTestContext {
                 clusterId,
                 bootstrapServers,
                 localListeners,
+                Features.KRAFT_VERSION.supportedVersionRange(),
                 logContext,
                 random,
                 quorumConfig
@@ -1148,14 +1166,50 @@ public final class RaftClientTestContext {
         return removeVoterResponse;
     }
 
-    // TODO: preferredSuccessors should be a list of replica keys
+    UpdateRaftVoterResponseData assertSentUpdateVoterResponse(
+        Errors error,
+        OptionalInt leaderId,
+        int epoch
+    ) {
+        List<RaftResponse.Outbound> sentResponses = 
drainSentResponses(ApiKeys.UPDATE_RAFT_VOTER);
+        assertEquals(1, sentResponses.size());
+
+        RaftResponse.Outbound response = sentResponses.get(0);
+        assertInstanceOf(UpdateRaftVoterResponseData.class, response.data());
+
+        UpdateRaftVoterResponseData updateVoterResponse = 
(UpdateRaftVoterResponseData) response.data();
+        assertEquals(error, Errors.forCode(updateVoterResponse.errorCode()));
+        assertEquals(leaderId.orElse(-1), 
updateVoterResponse.currentLeader().leaderId());
+        assertEquals(epoch, updateVoterResponse.currentLeader().leaderEpoch());
+
+        if (updateVoterResponse.currentLeader().leaderId() >= 0) {
+            int id = updateVoterResponse.currentLeader().leaderId();
+            Endpoints expectedLeaderEndpoints = startingVoters.listeners(id);
+            Endpoints responseEndpoints = Endpoints.fromInetSocketAddresses(
+                Collections.singletonMap(
+                    channel.listenerName(),
+                    InetSocketAddress.createUnresolved(
+                        updateVoterResponse.currentLeader().host(),
+                        updateVoterResponse.currentLeader().port()
+                    )
+                )
+            );
+            assertEquals(expectedLeaderEndpoints, responseEndpoints);
+        }
+        return updateVoterResponse;
+    }
+
     List<RaftRequest.Outbound> collectEndQuorumRequests(
         int epoch,
         Set<Integer> destinationIdSet,
-        Optional<List<Integer>> preferredSuccessorsOpt
+        Optional<List<ReplicaKey>> preferredCandidates
     ) {
         List<RaftRequest.Outbound> endQuorumRequests = new ArrayList<>();
         Set<Integer> collectedDestinationIdSet = new HashSet<>();
+
+        Optional<List<Integer>> preferredSuccessorsOpt = preferredCandidates
+            .map(list -> 
list.stream().map(ReplicaKey::id).collect(Collectors.toList()));
+
         for (RaftRequest.Outbound raftMessage : channel.drainSendQueue()) {
             if (raftMessage.data() instanceof EndQuorumEpochRequestData) {
                 EndQuorumEpochRequestData request = 
(EndQuorumEpochRequestData) raftMessage.data();
@@ -1168,6 +1222,16 @@ public final class RaftClientTestContext {
                 preferredSuccessorsOpt.ifPresent(preferredSuccessors ->
                     assertEquals(preferredSuccessors, 
partitionRequest.preferredSuccessors())
                 );
+                preferredCandidates.ifPresent(preferred ->
+                    assertEquals(
+                        preferred,
+                        partitionRequest
+                            .preferredCandidates()
+                            .stream()
+                            .map(replica -> 
ReplicaKey.of(replica.candidateId(), replica.candidateDirectoryId()))
+                            .collect(Collectors.toList())
+                    )
+                );
 
                 collectedDestinationIdSet.add(raftMessage.destination().id());
                 endQuorumRequests.add(raftMessage);
@@ -1644,6 +1708,24 @@ public final class RaftClientTestContext {
         return RaftUtil.removeVoterRequest(cluster, voter);
     }
 
+    UpdateRaftVoterRequestData updateVoterRequest(
+        ReplicaKey voter,
+        SupportedVersionRange supportedVersions,
+        Endpoints endpoints
+    ) {
+        return updateVoterRequest(clusterId, voter, currentEpoch(), 
supportedVersions, endpoints);
+    }
+
+    UpdateRaftVoterRequestData updateVoterRequest(
+        String clusterId,
+        ReplicaKey voter,
+        int epoch,
+        SupportedVersionRange supportedVersions,
+        Endpoints endpoints
+    ) {
+        return RaftUtil.updateVoterRequest(clusterId, voter, epoch, 
supportedVersions, endpoints);
+    }
+
     private short fetchRpcVersion() {
         if (kip853Rpc) {
             return 17;
@@ -1708,6 +1790,14 @@ public final class RaftClientTestContext {
         }
     }
 
+    private short updateVoterRpcVersion() {
+        if (kip853Rpc) {
+            return 0;
+        } else {
+            throw new IllegalStateException("Reconfiguration must be enabled 
by calling withKip853Rpc(true)");
+        }
+    }
+
     private short raftRequestVersion(ApiMessage request) {
         if (request instanceof FetchRequestData) {
             return fetchRpcVersion();
@@ -1725,6 +1815,8 @@ public final class RaftClientTestContext {
             return addVoterRpcVersion();
         } else if (request instanceof RemoveRaftVoterRequestData) {
             return removeVoterRpcVersion();
+        } else if (request instanceof UpdateRaftVoterRequestData) {
+            return updateVoterRpcVersion();
         } else {
             throw new IllegalArgumentException(String.format("Request %s is 
not a raft request", request));
         }
@@ -1747,6 +1839,8 @@ public final class RaftClientTestContext {
             return addVoterRpcVersion();
         } else if (response instanceof RemoveRaftVoterResponseData) {
             return removeVoterRpcVersion();
+        } else if (response instanceof UpdateRaftVoterResponseData) {
+            return updateVoterRpcVersion();
         } else if (response instanceof ApiVersionsResponseData) {
             return 4;
         } else {
@@ -1822,6 +1916,20 @@ public final class RaftClientTestContext {
             }
         }
 
+        Optional<VoterSet> lastCommittedVoterSet() {
+            return commits.stream()
+                .flatMap(batch -> batch.controlRecords().stream())
+                .flatMap(controlRecord -> {
+                    if (controlRecord.type() == 
ControlRecordType.KRAFT_VOTERS) {
+                        return Stream.of((VotersRecord) 
controlRecord.message());
+                    } else {
+                        return Stream.empty();
+                    }
+                })
+                .reduce((accumulated, current) -> current)
+                .map(VoterSet::fromVotersRecord);
+        }
+
         OptionalInt currentClaimedEpoch() {
             if (localId.isPresent() && 
currentLeaderAndEpoch.isLeader(localId.getAsInt())) {
                 return OptionalInt.of(currentLeaderAndEpoch.epoch());
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index a932ec72d4f..9b1a325dd2f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.MockLog.LogBatch;
 import org.apache.kafka.raft.MockLog.LogEntry;
 import org.apache.kafka.raft.internals.BatchMemoryPool;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.serialization.RecordSerde;
 import org.apache.kafka.snapshot.RecordsSnapshotReader;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -791,6 +792,7 @@ public class RaftEventSimulationTest {
                 clusterId,
                 Collections.emptyList(),
                 endpointsFromId(nodeId, channel.listenerName()),
+                Features.KRAFT_VERSION.supportedVersionRange(),
                 logContext,
                 random,
                 quorumConfig
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index 2085c442087..bd65bbe993f 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.raft.LogOffsetMetadata;
 import org.apache.kafka.raft.MockQuorumStateStore;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.QuorumState;
+import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.KRaftVersion;
 
 import org.junit.jupiter.api.AfterEach;
@@ -82,6 +83,7 @@ public class KafkaRaftMetricsTest {
             localDirectoryId,
             mockPartitionState,
             voterSet.listeners(localId),
+            Features.KRAFT_VERSION.supportedVersionRange(),
             electionTimeoutMs,
             fetchTimeoutMs,
             new MockQuorumStateStore(),
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java
index 06846636ea4..a4f2354d690 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.Endpoints;
+import org.apache.kafka.server.common.Features;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -115,6 +116,37 @@ public final class VoterSetTest {
         );
     }
 
+    @Test
+    void testUpdateVoter() {
+        Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1, 
2, 3), true);
+        VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+        assertEquals(Optional.empty(), voterSet.updateVoter(voterNode(4, 
true)));
+        assertFalse(voterSet.voterNodeNeedsUpdate(voterNode(4, true)));
+        assertEquals(Optional.empty(), voterSet.updateVoter(voterNode(3, 
true)));
+        assertFalse(voterSet.voterNodeNeedsUpdate(voterNode(3, true)));
+
+        VoterSet.VoterNode voter3 = aVoterMap.get(3);
+        VoterSet.VoterNode newVoter3 = VoterSet.VoterNode.of(
+            voter3.voterKey(),
+            Endpoints.fromInetSocketAddresses(
+                Collections.singletonMap(
+                    ListenerName.normalised("ABC"),
+                    InetSocketAddress.createUnresolved("abc", 1234)
+                )
+            ),
+            new SupportedVersionRange((short) 1, (short) 1)
+        );
+        aVoterMap.put(3, newVoter3);
+
+        assertTrue(voterSet.voterNodeNeedsUpdate(newVoter3));
+        assertEquals(
+            Optional.of(new VoterSet(new HashMap<>(aVoterMap))),
+            voterSet.updateVoter(newVoter3)
+        );
+    }
+
+
     @Test
     void testCannotRemoveToEmptyVoterSet() {
         Map<Integer, VoterSet.VoterNode> aVoterMap = voterMap(IntStream.of(1), 
true);
@@ -339,7 +371,7 @@ public final class VoterSetTest {
                     )
                 )
             ),
-            new SupportedVersionRange((short) 0, (short) 0)
+            Features.KRAFT_VERSION.supportedVersionRange()
         );
     }
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/Features.java 
b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
index f10b95dd696..15269e51836 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/Features.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/Features.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.server.common;
 
+import org.apache.kafka.common.feature.SupportedVersionRange;
+
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -86,6 +88,13 @@ public enum Features {
         return featureVersions[featureVersions.length - 1].featureLevel();
     }
 
+    public SupportedVersionRange supportedVersionRange() {
+        return new SupportedVersionRange(
+            minimumProduction(),
+            latestTesting()
+        );
+    }
+
     /**
      * Creates a FeatureVersion from a level.
      *
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java 
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
index ae14735ad15..a55dc7318c4 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java
@@ -86,6 +86,22 @@ public enum KRaftVersion implements FeatureVersion {
             case KRAFT_VERSION_1:
                 return (short) 1;
         }
-        throw new RuntimeException("Unknown KRaft feature level: " + this);
+        throw new IllegalStateException("Unsupported KRaft feature level: " + 
this);
+    }
+
+    public short kraftVersionRecordVersion() {
+        switch (this) {
+            case KRAFT_VERSION_1:
+                return (short) 0;
+        }
+        throw new IllegalStateException("Unsupported KRaft feature level: " + 
this);
+    }
+
+    public short votersRecordVersion() {
+        switch (this) {
+            case KRAFT_VERSION_1:
+                return (short) 0;
+        }
+        throw new IllegalStateException("Unsupported KRaft feature level: " + 
this);
     }
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
index 4c6d417bb8b..d8309be01b5 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/KRaftVersionTest.java
@@ -17,11 +17,14 @@
 
 package org.apache.kafka.server.common;
 
+import org.apache.kafka.common.record.ControlRecordUtils;
+
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-class KRaftVersionTest {
+public final class KRaftVersionTest {
     @Test
     public void testFeatureLevel() {
         for (int i = 0; i < KRaftVersion.values().length; i++) {
@@ -59,4 +62,52 @@ class KRaftVersionTest {
             }
         }
     }
+
+    @Test
+    public void testKraftVersionRecordVersion() {
+        for (KRaftVersion kraftVersion : KRaftVersion.values()) {
+            switch (kraftVersion) {
+                case KRAFT_VERSION_0:
+                    assertThrows(
+                        IllegalStateException.class,
+                        () -> kraftVersion.kraftVersionRecordVersion()
+                    );
+                    break;
+
+                case KRAFT_VERSION_1:
+                    assertEquals(
+                        ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION,
+                        kraftVersion.kraftVersionRecordVersion()
+                    );
+                    break;
+
+                default:
+                    throw new RuntimeException("Unsupported value " + 
kraftVersion);
+            }
+        }
+    }
+
+    @Test
+    public void tesVotersRecordVersion() {
+        for (KRaftVersion kraftVersion : KRaftVersion.values()) {
+            switch (kraftVersion) {
+                case KRAFT_VERSION_0:
+                    assertThrows(
+                        IllegalStateException.class,
+                        () -> kraftVersion.votersRecordVersion()
+                    );
+                    break;
+
+                case KRAFT_VERSION_1:
+                    assertEquals(
+                        ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION,
+                        kraftVersion.votersRecordVersion()
+                    );
+                    break;
+
+                default:
+                    throw new RuntimeException("Unsupported value " + 
kraftVersion);
+            }
+        }
+    }
 }

Reply via email to