This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8ce514a52ef KAFKA-16534; Implemeent update voter sending (#16837)
8ce514a52ef is described below

commit 8ce514a52ef579721cb831ad528a4c8d2d8135b4
Author: José Armando García Sancio <[email protected]>
AuthorDate: Thu Aug 8 19:16:09 2024 -0400

    KAFKA-16534; Implemeent update voter sending (#16837)
    
    This change implements the KRaft voter sending UpdateVoter request. The
    UpdateVoter RPC is used to update a voter's listeners and supported
    kraft versions. The UpdateVoter RPC is sent if the replicated voter set
    (VotersRecord in the log) doesn't match the local voter's supported
    kraft versions and controller listeners.
    
    To not starve the Fetch request, the UpdateVoter request is sent at most
    every 3 fetch timeouts. This is required to make sure that replication
    is making progress and eventually the voter set in the replicated log
    matches the local voter configuration.
    
    This change also modifies the semantic for UpdateVoter. Now the
    UpdateVoter response is sent right after the leader has created the new
    voter set. This is required so that updating voter can transition from
    sending UpdateVoter request to sending Fetch request. If the leader
    waits for the VotersRecord control record to commit before sending the
    UpdateVoter response, it may never send the UpdateVoter response. This
    can happen if the leader needs that voter's Fetch request to commit the
    control record.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../common/message/UpdateRaftVoterRequest.json     |   2 +-
 .../java/org/apache/kafka/raft/FollowerState.java  |  42 ++-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  85 ++++-
 .../java/org/apache/kafka/raft/LeaderState.java    |  78 ++---
 .../java/org/apache/kafka/raft/QuorumState.java    |   9 +-
 .../main/java/org/apache/kafka/raft/RaftUtil.java  |   2 +
 .../kafka/raft/internals/UpdateVoterHandler.java   |  35 +-
 .../kafka/raft/KafkaRaftClientReconfigTest.java    | 388 +++++++++------------
 .../org/apache/kafka/raft/LeaderStateTest.java     |   3 -
 .../apache/kafka/raft/RaftClientTestContext.java   |  38 ++
 10 files changed, 364 insertions(+), 318 deletions(-)

diff --git 
a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json 
b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json
index 5dde41cff56..dadca902f15 100644
--- a/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json
+++ b/clients/src/main/resources/common/message/UpdateRaftVoterRequest.json
@@ -21,7 +21,7 @@
   "validVersions": "0",
   "flexibleVersions": "0+",
   "fields": [
-    { "name": "ClusterId", "type": "string", "versions": "0+" },
+    { "name": "ClusterId", "type": "string", "versions": "0+", 
"nullableVersions": "0+" },
     { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "0+",
       "about": "The current leader epoch of the partition, -1 for unknown 
leader epoch" },
     { "name": "VoterId", "type": "int32", "versions": "0+",
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java 
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index a4d635e0388..2e26e7c5a70 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -34,7 +34,7 @@ public class FollowerState implements EpochState {
     private final int fetchTimeoutMs;
     private final int epoch;
     private final int leaderId;
-    private final Endpoints endpoints;
+    private final Endpoints leaderEndpoints;
     private final Set<Integer> voters;
     // Used for tracking the expiration of both the Fetch and FetchSnapshot 
requests
     private final Timer fetchTimer;
@@ -43,6 +43,8 @@ public class FollowerState implements EpochState {
      * Fetch request are paused
      */
     private Optional<RawSnapshotWriter> fetchingSnapshot = Optional.empty();
+    // Used to throttle update voter request and allow for Fetch/FetchSnapshot 
requests
+    private final Timer updateVoterPeriodTimer;
 
     private final Logger log;
 
@@ -50,7 +52,7 @@ public class FollowerState implements EpochState {
         Time time,
         int epoch,
         int leaderId,
-        Endpoints endpoints,
+        Endpoints leaderEndpoints,
         Set<Integer> voters,
         Optional<LogOffsetMetadata> highWatermark,
         int fetchTimeoutMs,
@@ -59,9 +61,10 @@ public class FollowerState implements EpochState {
         this.fetchTimeoutMs = fetchTimeoutMs;
         this.epoch = epoch;
         this.leaderId = leaderId;
-        this.endpoints = endpoints;
+        this.leaderEndpoints = leaderEndpoints;
         this.voters = voters;
         this.fetchTimer = time.timer(fetchTimeoutMs);
+        this.updateVoterPeriodTimer = time.timer(updateVoterPeriodMs());
         this.highWatermark = highWatermark;
         this.log = logContext.logger(FollowerState.class);
     }
@@ -78,7 +81,7 @@ public class FollowerState implements EpochState {
 
     @Override
     public Endpoints leaderEndpoints() {
-        return endpoints;
+        return leaderEndpoints;
     }
 
     @Override
@@ -96,7 +99,7 @@ public class FollowerState implements EpochState {
     }
 
     public Node leaderNode(ListenerName listener) {
-        return endpoints
+        return leaderEndpoints
             .address(listener)
             .map(address -> new Node(leaderId, address.getHostString(), 
address.getPort()))
             .orElseThrow(() ->
@@ -105,7 +108,7 @@ public class FollowerState implements EpochState {
                         "Unknown endpoint for leader %d and listener %s, known 
endpoints are %s",
                         leaderId,
                         listener,
-                        endpoints
+                        leaderEndpoints
                     )
                 )
             );
@@ -126,6 +129,27 @@ public class FollowerState implements EpochState {
         fetchTimer.reset(timeoutMs);
     }
 
+    private long updateVoterPeriodMs() {
+        // Allow for a few rounds of fetch request before attempting to update
+        // the voter state
+        return fetchTimeoutMs * 3;
+    }
+
+    public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) {
+        updateVoterPeriodTimer.update(currentTimeMs);
+        return updateVoterPeriodTimer.isExpired();
+    }
+
+    public long remainingUpdateVoterPeriodMs(long currentTimeMs) {
+        updateVoterPeriodTimer.update(currentTimeMs);
+        return updateVoterPeriodTimer.remainingMs();
+    }
+
+    public void resetUpdateVoterPeriod(long currentTimeMs) {
+        updateVoterPeriodTimer.update(currentTimeMs);
+        updateVoterPeriodTimer.reset(updateVoterPeriodMs());
+    }
+
     public boolean updateHighWatermark(OptionalLong newHighWatermark) {
         if (!newHighWatermark.isPresent() && highWatermark.isPresent()) {
             throw new IllegalArgumentException(
@@ -192,12 +216,12 @@ public class FollowerState implements EpochState {
     @Override
     public String toString() {
         return String.format(
-            "FollowerState(fetchTimeoutMs=%d, epoch=%d, leader=%d, 
endpoints=%s, voters=%s, highWatermark=%s, " +
-            "fetchingSnapshot=%s)",
+            "FollowerState(fetchTimeoutMs=%d, epoch=%d, leader=%d, 
leaderEndpoints=%s, " +
+            "voters=%s, highWatermark=%s, fetchingSnapshot=%s)",
             fetchTimeoutMs,
             epoch,
             leaderId,
-            endpoints,
+            leaderEndpoints,
             voters,
             highWatermark,
             fetchingSnapshot
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index ed11e45b520..9c5035af297 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -359,7 +359,6 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             // add or remove voter request that need to be completed
             addVoterHandler.highWatermarkUpdated(state);
             removeVoterHandler.highWatermarkUpdated(state);
-            updateVoterHandler.highWatermarkUpdated(state);
 
             // After updating the high watermark, we first clear the append
             // purgatory so that we have an opportunity to route the pending
@@ -622,7 +621,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         // The high watermark can only be advanced once we have written a 
record
         // from the new leader's epoch. Hence we write a control message 
immediately
         // to ensure there is no delay committing pending data.
-        state.appendLeaderChangeMessageAndBootstrapRecords(currentTimeMs);
+        state.appendStartOfEpochControlRecords(quorum.localVoterNodeOrThrow(), 
currentTimeMs);
 
         resetConnections();
         kafkaRaftMetrics.maybeUpdateElectionLatency(currentTimeMs);
@@ -2253,6 +2252,43 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
     }
 
+    private boolean handleUpdateVoterResponse(
+        RaftResponse.Inbound responseMetadata,
+        long currentTimeMs
+    ) {
+        UpdateRaftVoterResponseData data = (UpdateRaftVoterResponseData) 
responseMetadata.data();
+
+        Errors error = Errors.forCode(data.errorCode());
+        OptionalInt responseLeaderId = 
optionalLeaderId(data.currentLeader().leaderId());
+        int responseEpoch = data.currentLeader().leaderEpoch();
+
+        final Endpoints leaderEndpoints;
+        if (responseLeaderId.isPresent() && 
data.currentLeader().host().isEmpty()) {
+            leaderEndpoints = Endpoints.fromInetSocketAddresses(
+                Collections.singletonMap(
+                    channel.listenerName(),
+                    InetSocketAddress.createUnresolved(
+                        data.currentLeader().host(),
+                        data.currentLeader().port()
+                    )
+                )
+            );
+        } else {
+            leaderEndpoints = Endpoints.empty();
+        }
+
+        Optional<Boolean> handled = maybeHandleCommonResponse(
+            error,
+            responseLeaderId,
+            responseEpoch,
+            leaderEndpoints,
+            responseMetadata.source(),
+            currentTimeMs
+        );
+
+        return handled.orElse(true);
+    }
+
     private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
         // Only elected leaders are sent in the request/response header, so if 
we have an elected
         // leaderId, it should be consistent with what is in the message.
@@ -2419,6 +2455,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 handledSuccessfully = handleApiVersionsResponse(response, 
currentTimeMs);
                 break;
 
+            case UPDATE_RAFT_VOTER:
+                handledSuccessfully = handleUpdateVoterResponse(response, 
currentTimeMs);
+                break;
+
             default:
                 throw new IllegalArgumentException("Received unexpected 
response type: " + apiKey);
         }
@@ -2964,19 +3004,32 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
     private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) {
         GracefulShutdown shutdown = this.shutdown.get();
+        final long backoffMs;
         if (shutdown != null) {
             // If we are a follower, then we can shutdown immediately. We want 
to
             // skip the transition to candidate in any case.
-            return 0;
+            backoffMs = 0;
         } else if (state.hasFetchTimeoutExpired(currentTimeMs)) {
             logger.info("Become candidate due to fetch timeout");
             transitionToCandidate(currentTimeMs);
-            return 0L;
+            backoffMs = 0;
+        } else if (state.hasUpdateVoterPeriodExpired(currentTimeMs) &&
+            partitionState.lastKraftVersion().isReconfigSupported() &&
+            
partitionState.lastVoterSet().voterNodeNeedsUpdate(quorum.localVoterNodeOrThrow())
+        ) {
+            backoffMs = maybeSendUpdateVoterRequest(state, currentTimeMs);
+            state.resetUpdateVoterPeriod(currentTimeMs);
         } else {
-            long backoffMs = maybeSendFetchOrFetchSnapshot(state, 
currentTimeMs);
-
-            return Math.min(backoffMs, 
state.remainingFetchTimeMs(currentTimeMs));
+            backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs);
         }
+
+        return Math.min(
+            backoffMs,
+            Math.min(
+                state.remainingFetchTimeMs(currentTimeMs),
+                state.remainingUpdateVoterPeriodMs(currentTimeMs)
+            )
+        );
     }
 
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
@@ -3024,6 +3077,24 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
     }
 
+    private UpdateRaftVoterRequestData buildUpdateVoterRequest() {
+        return RaftUtil.updateVoterRequest(
+            clusterId,
+            quorum.localReplicaKeyOrThrow(),
+            quorum.epoch(),
+            localSupportedKRaftVersion,
+            localListeners
+        );
+    }
+
+    private long maybeSendUpdateVoterRequest(FollowerState state, long 
currentTimeMs) {
+        return maybeSendRequest(
+            currentTimeMs,
+            state.leaderNode(channel.listenerName()),
+            this::buildUpdateVoterRequest
+        );
+    }
+
     private long pollVoted(long currentTimeMs) {
         VotedState state = quorum.votedStateOrThrow();
         GracefulShutdown shutdown = this.shutdown.get();
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index ecc0e68a0f8..df1cc6e142e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.raft;
 
-import org.apache.kafka.common.feature.SupportedVersionRange;
 import org.apache.kafka.common.message.KRaftVersionRecord;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
@@ -32,7 +31,6 @@ import org.apache.kafka.raft.internals.AddVoterHandlerState;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.RemoveVoterHandlerState;
 import org.apache.kafka.raft.internals.ReplicaKey;
-import org.apache.kafka.raft.internals.UpdateVoterHandlerState;
 import org.apache.kafka.raft.internals.VoterSet;
 import org.apache.kafka.server.common.KRaftVersion;
 
@@ -46,7 +44,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -68,7 +65,6 @@ public class LeaderState<T> implements EpochState {
     private final long epochStartOffset;
     private final Set<Integer> grantingVoters;
     private final Endpoints localListeners;
-    private final SupportedVersionRange localSupportedKRaftVersion;
     private final VoterSet voterSetAtEpochStart;
     // This field is non-empty if the voter set at epoch start came from a 
snapshot or log segment
     private final OptionalLong offsetOfVotersAtEpochStart;
@@ -78,8 +74,6 @@ public class LeaderState<T> implements EpochState {
     private Map<Integer, ReplicaState> voterStates = new HashMap<>();
     private Optional<AddVoterHandlerState> addVoterHandlerState = 
Optional.empty();
     private Optional<RemoveVoterHandlerState> removeVoterHandlerState = 
Optional.empty();
-    private Optional<UpdateVoterHandlerState> updateVoterHandlerState = 
Optional.empty();
-
 
     private final Map<ReplicaKey, ReplicaState> observerStates = new 
HashMap<>();
     private final Logger log;
@@ -105,7 +99,6 @@ public class LeaderState<T> implements EpochState {
         Set<Integer> grantingVoters,
         BatchAccumulator<T> accumulator,
         Endpoints localListeners,
-        SupportedVersionRange localSupportedKRaftVersion,
         int fetchTimeoutMs,
         LogContext logContext
     ) {
@@ -113,7 +106,6 @@ public class LeaderState<T> implements EpochState {
         this.epoch = epoch;
         this.epochStartOffset = epochStartOffset;
         this.localListeners = localListeners;
-        this.localSupportedKRaftVersion = localSupportedKRaftVersion;
 
         for (VoterSet.VoterNode voterNode: voterSetAtEpochStart.voterNodes()) {
             boolean hasAcknowledgedLeader = voterNode.isVoter(localReplicaKey);
@@ -247,24 +239,6 @@ public class LeaderState<T> implements EpochState {
         removeVoterHandlerState = state;
     }
 
-    public Optional<UpdateVoterHandlerState> updateVoterHandlerState() {
-        return updateVoterHandlerState;
-    }
-
-    public void resetUpdateVoterHandlerState(
-        Errors error,
-        Optional<UpdateVoterHandlerState> state
-    ) {
-        updateVoterHandlerState.ifPresent(
-            handlerState -> handlerState.completeFuture(
-                error,
-                new LeaderAndEpoch(OptionalInt.of(localReplicaKey.id()), 
epoch),
-                localListeners
-            )
-        );
-        updateVoterHandlerState = state;
-    }
-
     public long maybeExpirePendingOperation(long currentTimeMs) {
         // First abort any expired operations
         long timeUntilAddVoterExpiration = addVoterHandlerState()
@@ -283,27 +257,14 @@ public class LeaderState<T> implements EpochState {
             resetRemoveVoterHandlerState(Errors.REQUEST_TIMED_OUT, null, 
Optional.empty());
         }
 
-        long timeUntilUpdateVoterExpiration = updateVoterHandlerState()
-            .map(state -> state.timeUntilOperationExpiration(currentTimeMs))
-            .orElse(Long.MAX_VALUE);
-
-        if (timeUntilUpdateVoterExpiration == 0) {
-            resetUpdateVoterHandlerState(Errors.REQUEST_TIMED_OUT, 
Optional.empty());
-        }
-
         // Reread the timeouts and return the smaller of them
         return Math.min(
             addVoterHandlerState()
                 .map(state -> 
state.timeUntilOperationExpiration(currentTimeMs))
                 .orElse(Long.MAX_VALUE),
-            Math.min(
-                removeVoterHandlerState()
-                    .map(state -> 
state.timeUntilOperationExpiration(currentTimeMs))
-                    .orElse(Long.MAX_VALUE),
-                updateVoterHandlerState()
-                    .map(state -> 
state.timeUntilOperationExpiration(currentTimeMs))
-                    .orElse(Long.MAX_VALUE)
-            )
+            removeVoterHandlerState()
+                .map(state -> 
state.timeUntilOperationExpiration(currentTimeMs))
+                .orElse(Long.MAX_VALUE)
         );
     }
 
@@ -318,7 +279,25 @@ public class LeaderState<T> implements EpochState {
             .collect(Collectors.toList());
     }
 
-    public void appendLeaderChangeMessageAndBootstrapRecords(long 
currentTimeMs) {
+    public void appendStartOfEpochControlRecords(VoterSet.VoterNode 
localVoterNode, long currentTimeMs) {
+        if (!localReplicaKey.equals(localVoterNode.voterKey())) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Replica key %s didn't match the local key %s",
+                    localVoterNode.voterKey(),
+                    localReplicaKey
+                )
+            );
+        } else if (!localListeners.equals(localVoterNode.listeners())) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Listeners %s didn't match the local listeners %s",
+                    localVoterNode.listeners(),
+                    localListeners
+                )
+            );
+        }
+
         List<Voter> voters = convertToVoters(voterStates.keySet());
         List<Voter> grantingVoters = convertToVoters(this.grantingVoters());
 
@@ -360,22 +339,16 @@ public class LeaderState<T> implements EpochState {
                         )
                     );
 
-                    VoterSet.VoterNode updatedVoterNode = 
VoterSet.VoterNode.of(
-                        localReplicaKey,
-                        localListeners,
-                        localSupportedKRaftVersion
-                    );
-
                     // The leader should write the latest voters record if its 
local listeners are different
                     // or it has never written a voters record to the log 
before.
-                    if (offset == -1 || 
voterSetAtEpochStart.voterNodeNeedsUpdate(updatedVoterNode)) {
+                    if (offset == -1 || 
voterSetAtEpochStart.voterNodeNeedsUpdate(localVoterNode)) {
                         VoterSet updatedVoterSet = voterSetAtEpochStart
-                            .updateVoter(updatedVoterNode)
+                            .updateVoter(localVoterNode)
                             .orElseThrow(
                                 () -> new IllegalStateException(
                                     String.format(
                                         "Update expected for leader node %s 
and voter set %s",
-                                        updatedVoterNode,
+                                        localVoterNode,
                                         voterSetAtEpochStart
                                     )
                                 )
@@ -885,7 +858,6 @@ public class LeaderState<T> implements EpochState {
     public void close() {
         resetAddVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, 
Optional.empty());
         resetRemoveVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, null, 
Optional.empty());
-        resetUpdateVoterHandlerState(Errors.NOT_LEADER_OR_FOLLOWER, 
Optional.empty());
 
         accumulator.close();
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 913a7f474d5..ac22fb65d1b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -285,6 +285,14 @@ public class QuorumState {
         return ReplicaKey.of(localIdOrThrow(), localDirectoryId());
     }
 
+    public VoterSet.VoterNode localVoterNodeOrThrow() {
+        return VoterSet.VoterNode.of(
+            localReplicaKeyOrThrow(),
+            localListeners,
+            localSupportedKRaftVersion
+        );
+    }
+
     public int epoch() {
         return state.epoch();
     }
@@ -554,7 +562,6 @@ public class QuorumState {
             candidateState.grantingVoters(),
             accumulator,
             localListeners,
-            localSupportedKRaftVersion,
             fetchTimeoutMs,
             logContext
         );
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index 89c896182fb..7ec629bc9b5 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -70,6 +70,8 @@ public class RaftUtil {
                 return new 
FetchSnapshotResponseData().setErrorCode(error.code());
             case API_VERSIONS:
                 return new 
ApiVersionsResponseData().setErrorCode(error.code());
+            case UPDATE_RAFT_VOTER:
+                return new 
UpdateRaftVoterResponseData().setErrorCode(error.code());
             default:
                 throw new IllegalArgumentException("Received response for 
unexpected request type: " + apiKey);
         }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
index 152de68f3fc..985ec189195 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
@@ -45,9 +45,7 @@ import java.util.concurrent.CompletableFuture;
  * 5. Check that the updated voter is still listening on the default listener.
  * 6. Append the updated VotersRecord to the log. The KRaft internal listener 
will read this
  *    uncommitted record from the log and update the voter in the set of 
voters.
- * 7. Wait for the VotersRecord to commit using the majority of the voters. 
Return a
- *    REQUEST_TIMED_OUT error if it doesn't commit in time.
- * 8. Send the UpdateVoter successful response to the voter.
+ * 7. Send the UpdateVoter successful response to the voter.
  *
  * KAFKA-16538 is going to add support for handling this RPC when the 
kraft.version is 0.
  */
@@ -202,25 +200,20 @@ public final class UpdateVoterHandler {
             );
         }
 
-        UpdateVoterHandlerState state = new UpdateVoterHandlerState(
-            leaderState.appendVotersRecord(updatedVoters.get(), currentTimeMs),
-            requestListenerName,
-            time.timer(requestTimeoutMs)
-        );
-        leaderState.resetUpdateVoterHandlerState(Errors.UNKNOWN_SERVER_ERROR, 
Optional.of(state));
-
-        return state.future();
-    }
+        leaderState.appendVotersRecord(updatedVoters.get(), currentTimeMs);
 
-    public void highWatermarkUpdated(LeaderState<?> leaderState) {
-        leaderState.updateVoterHandlerState().ifPresent(current -> {
-            leaderState.highWatermark().ifPresent(highWatermark -> {
-                if (highWatermark.offset() > current.lastOffset()) {
-                    // VotersRecord with the updated voter was committed; 
complete the RPC
-                    leaderState.resetUpdateVoterHandlerState(Errors.NONE, 
Optional.empty());
-                }
-            });
-        });
+        // Reply immediately and don't wait for the change to commit
+        return CompletableFuture.completedFuture(
+            RaftUtil.updateVoterResponse(
+                Errors.NONE,
+                requestListenerName,
+                new LeaderAndEpoch(
+                    localId,
+                    leaderState.epoch()
+                ),
+                leaderState.leaderEndpoints()
+            )
+        );
     }
 
     private boolean validVersionRange(
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 1b34ba0433c..bb1cbbb7bb0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -1586,28 +1586,16 @@ public class KafkaRaftClientReconfigTest {
             )
         );
 
-        // Handle the update voter request
-        context.client.poll();
-        // Append the VotersRecord to the log
-        context.client.poll();
-
-        // follower should not be a voter in the latest voter set
-        assertTrue(context.client.quorum().isVoter(follower));
-
-        // Send a FETCH to increase the HWM and commit the new voter set
-        context.deliverRequest(
-            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
-        );
-        context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
-
-        // Expect reply for UpdateVoter request
+        // Expect reply for UpdateVoter request without commiting the record
         context.pollUntilResponse();
         context.assertSentUpdateVoterResponse(
             Errors.NONE,
             OptionalInt.of(local.id()),
             epoch
         );
+
+        // follower should still be a voter in the latest voter set
+        assertTrue(context.client.quorum().isVoter(follower));
     }
 
     @Test
@@ -1803,71 +1791,6 @@ public class KafkaRaftClientReconfigTest {
         );
     }
 
-    @Test
-    void testUpdateVoterWithPendingUpdateVoter() throws Exception {
-        ReplicaKey local = replicaKey(randomeReplicaId(), true);
-        ReplicaKey follower = replicaKey(local.id() + 1, true);
-
-        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
-
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
-            .withKip853Rpc(true)
-            .withBootstrapSnapshot(Optional.of(voters))
-            .withUnknownLeader(3)
-            .build();
-
-        context.becomeLeader();
-        int epoch = context.currentEpoch();
-
-        // Establish a HWM and fence previous leaders
-        context.deliverRequest(
-            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
-        );
-        context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
-
-        // Attempt to update the follower
-        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            9990 + follower.id()
-        );
-        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            8990 + follower.id()
-        );
-        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
-        listenersMap.put(context.channel.listenerName(), defaultAddress);
-        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
-        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
-        context.deliverRequest(
-            context.updateVoterRequest(
-                follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
-                newListeners
-            )
-        );
-
-        // Handle the update voter request
-        context.client.poll();
-        // Append the VotersRecord to the log
-        context.client.poll();
-
-        // Attempt to update voter again
-        context.deliverRequest(
-            context.updateVoterRequest(
-                follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
-                newListeners
-            )
-        );
-        context.pollUntilResponse();
-        context.assertSentUpdateVoterResponse(
-            Errors.REQUEST_TIMED_OUT,
-            OptionalInt.of(local.id()),
-            epoch
-        );
-    }
-
     @Test
     void testUpdateVoterWithoutFencedPreviousLeaders() throws Exception {
         ReplicaKey local = replicaKey(randomeReplicaId(), true);
@@ -2067,7 +1990,7 @@ public class KafkaRaftClientReconfigTest {
     }
 
     @Test
-    void testUpdateVoterTimedOut() throws Exception {
+    void testUpdateVoterWithPendingAddVoter() throws Exception {
         ReplicaKey local = replicaKey(randomeReplicaId(), true);
         ReplicaKey follower = replicaKey(local.id() + 1, true);
 
@@ -2082,6 +2005,15 @@ public class KafkaRaftClientReconfigTest {
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newVoterAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newVoterListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newVoterAddress)
+        );
+
         // Establish a HWM and fence previous leaders
         context.deliverRequest(
             context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
@@ -2089,6 +2021,16 @@ public class KafkaRaftClientReconfigTest {
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
 
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newVoterListeners));
+
         // Attempt to update the follower
         InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
             "localhost",
@@ -2109,16 +2051,6 @@ public class KafkaRaftClientReconfigTest {
                 newListeners
             )
         );
-
-        // Handle the update voter request
-        context.client.poll();
-        // Append the VotersRecord to the log
-        context.client.poll();
-
-        // Wait for request timeout without sending a FETCH request to timeout 
the update voter RPC
-        context.time.sleep(context.requestTimeoutMs());
-
-        // Expect a timeout error
         context.pollUntilResponse();
         context.assertSentUpdateVoterResponse(
             Errors.REQUEST_TIMED_OUT,
@@ -2128,182 +2060,192 @@ public class KafkaRaftClientReconfigTest {
     }
 
     @Test
-    void testUpdateVoterFailsWhenLosingLeadership() throws Exception {
+    void testFollowerSendsUpdateVoter() throws Exception {
         ReplicaKey local = replicaKey(randomeReplicaId(), true);
-        ReplicaKey follower = replicaKey(local.id() + 1, true);
+        ReplicaKey voter1 = replicaKey(local.id() + 1, true);
+        ReplicaKey voter2 = replicaKey(local.id() + 2, true);
 
-        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, voter1, 
voter2));
+        int epoch = 4;
+
+        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
+        listenersMap.put(
+            VoterSetTest.DEFAULT_LISTENER_NAME,
+            InetSocketAddress.createUnresolved("localhost", 9990 + local.id())
+        );
+        listenersMap.put(
+            ListenerName.normalised("ANOTHER_LISTENER"),
+            InetSocketAddress.createUnresolved("localhost", 8990 + local.id())
+        );
+        Endpoints localListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
             .withKip853Rpc(true)
             .withBootstrapSnapshot(Optional.of(voters))
-            .withUnknownLeader(3)
+            .withElectedLeader(epoch, voter1.id())
+            .withLocalListeners(localListeners)
             .build();
 
-        context.becomeLeader();
-        int epoch = context.currentEpoch();
+        // waiting for 3 times the fetch timeout sends an update voter
+        for (int i = 0; i < 3; i++) {
+            context.time.sleep(context.fetchTimeoutMs - 1);
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
 
-        // Establish a HWM and fence previous leaders
-        context.deliverRequest(
-            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
-        );
-        context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(
+                    epoch,
+                    voter1.id(),
+                    MemoryRecords.EMPTY,
+                    0L,
+                    Errors.NONE
+                )
+            );
+            // poll kraft to handle the fetch response
+            context.client.poll();
+        }
 
-        // Attempt to update the follower
-        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            9990 + follower.id()
-        );
-        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            8990 + follower.id()
+        context.time.sleep(context.fetchTimeoutMs - 1);
+        context.pollUntilRequest();
+        RaftRequest.Outbound updateRequest = 
context.assertSentUpdateVoterRequest(
+            local,
+            epoch,
+            Features.KRAFT_VERSION.supportedVersionRange(),
+            localListeners
         );
-        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
-        listenersMap.put(context.channel.listenerName(), defaultAddress);
-        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
-        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
-        context.deliverRequest(
-            context.updateVoterRequest(
-                follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
-                newListeners
+        context.deliverResponse(
+            updateRequest.correlationId(),
+            updateRequest.destination(),
+            context.updateVoterResponse(
+                Errors.NONE,
+                new LeaderAndEpoch(OptionalInt.of(voter1.id()), epoch)
             )
         );
 
-        // Handle the update voter request
-        context.client.poll();
-        // Append the VotersRecord to the log
-        context.client.poll();
-
-        // Leader completes the UpdateVoter RPC when resigning
-        context.client.resign(epoch);
-        context.pollUntilResponse();
-        context.assertSentUpdateVoterResponse(
-            Errors.NOT_LEADER_OR_FOLLOWER,
-            OptionalInt.of(local.id()),
-            epoch
-        );
+        // after sending an update voter the next request should be a fetch
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
     }
 
     @Test
-    void testUpdateVoterWithPendingAddVoter() throws Exception {
+    void testFollowerSendsUpdateVoterWhenDifferent() throws Exception {
         ReplicaKey local = replicaKey(randomeReplicaId(), true);
-        ReplicaKey follower = replicaKey(local.id() + 1, true);
+        ReplicaKey voter1 = replicaKey(local.id() + 1, true);
+        ReplicaKey voter2 = replicaKey(local.id() + 2, true);
 
-        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, voter1, 
voter2));
+        int epoch = 4;
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
             .withKip853Rpc(true)
             .withBootstrapSnapshot(Optional.of(voters))
-            .withUnknownLeader(3)
+            .withElectedLeader(epoch, voter1.id())
             .build();
 
-        context.becomeLeader();
-        int epoch = context.currentEpoch();
+        // waiting for 3 times the fetch timeout sends an update voter
+        for (int i = 0; i < 3; i++) {
+            context.time.sleep(context.fetchTimeoutMs - 1);
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
 
-        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
-        InetSocketAddress newVoterAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            9990 + newVoter.id()
-        );
-        Endpoints newVoterListeners = Endpoints.fromInetSocketAddresses(
-            Collections.singletonMap(context.channel.listenerName(), 
newVoterAddress)
-        );
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(
+                    epoch,
+                    voter1.id(),
+                    MemoryRecords.EMPTY,
+                    0L,
+                    Errors.NONE
+                )
+            );
+            // poll kraft to handle the fetch response
+            context.client.poll();
+        }
 
-        // Establish a HWM and fence previous leaders
-        context.deliverRequest(
-            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
-        );
-        context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+        // update voter should not be sent because the local listener is not 
different from the voter set
+        context.time.sleep(context.fetchTimeoutMs - 1);
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+    }
 
-        // Catch up the new voter to the leader's LEO
-        context.deliverRequest(
-            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
-        );
-        context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+    @Test
+    void testUpdateVoterResponseCausesEpochChange() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey voter1 = replicaKey(local.id() + 1, true);
+        ReplicaKey voter2 = replicaKey(local.id() + 2, true);
 
-        // Attempt to add new voter to the quorum
-        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newVoterListeners));
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, voter1, 
voter2));
+        int epoch = 4;
 
-        // Attempt to update the follower
-        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            9990 + follower.id()
-        );
-        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            8990 + follower.id()
-        );
         HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
-        listenersMap.put(context.channel.listenerName(), defaultAddress);
-        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
-        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
-        context.deliverRequest(
-            context.updateVoterRequest(
-                follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
-                newListeners
-            )
+        listenersMap.put(
+            VoterSetTest.DEFAULT_LISTENER_NAME,
+            InetSocketAddress.createUnresolved("localhost", 9990 + local.id())
         );
-        context.pollUntilResponse();
-        context.assertSentUpdateVoterResponse(
-            Errors.REQUEST_TIMED_OUT,
-            OptionalInt.of(local.id()),
-            epoch
+        listenersMap.put(
+            ListenerName.normalised("ANOTHER_LISTENER"),
+            InetSocketAddress.createUnresolved("localhost", 8990 + local.id())
         );
-    }
-
-    @Test
-    void testRemoveVoterWithPendingUpdateVoter() throws Exception {
-        ReplicaKey local = replicaKey(randomeReplicaId(), true);
-        ReplicaKey follower = replicaKey(local.id() + 1, true);
-
-        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+        Endpoints localListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
             .withKip853Rpc(true)
             .withBootstrapSnapshot(Optional.of(voters))
-            .withUnknownLeader(3)
+            .withElectedLeader(epoch, voter1.id())
+            .withLocalListeners(localListeners)
             .build();
 
-        context.becomeLeader();
-        int epoch = context.currentEpoch();
+        // waiting for 3 times the fetch timeout sends an update voter
+        for (int i = 0; i < 3; i++) {
+            context.time.sleep(context.fetchTimeoutMs - 1);
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
 
-        // Establish a HWM and fence previous leaders
-        context.deliverRequest(
-            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
-        );
-        context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(
+                    epoch,
+                    voter1.id(),
+                    MemoryRecords.EMPTY,
+                    0L,
+                    Errors.NONE
+                )
+            );
+            // poll kraft to handle the fetch response
+            context.client.poll();
+        }
 
-        // Attempt to update the follower
-        InetSocketAddress defaultAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            9990 + follower.id()
-        );
-        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
-            "localhost",
-            8990 + follower.id()
+        context.time.sleep(context.fetchTimeoutMs - 1);
+        context.pollUntilRequest();
+        RaftRequest.Outbound updateRequest = 
context.assertSentUpdateVoterRequest(
+            local,
+            epoch,
+            Features.KRAFT_VERSION.supportedVersionRange(),
+            localListeners
         );
-        HashMap<ListenerName, InetSocketAddress> listenersMap = new 
HashMap<>(2);
-        listenersMap.put(context.channel.listenerName(), defaultAddress);
-        listenersMap.put(ListenerName.normalised("ANOTHER_LISTENER"), 
newAddress);
-        Endpoints newListeners = 
Endpoints.fromInetSocketAddresses(listenersMap);
-        context.deliverRequest(
-            context.updateVoterRequest(
-                follower,
-                Features.KRAFT_VERSION.supportedVersionRange(),
-                newListeners
+        context.deliverResponse(
+            updateRequest.correlationId(),
+            updateRequest.destination(),
+            context.updateVoterResponse(
+                Errors.NONE,
+                new LeaderAndEpoch(OptionalInt.of(voter2.id()), epoch + 1)
             )
         );
 
-        // Attempt to remove follower while UpdateVoter is pending
-        context.deliverRequest(context.removeVoterRequest(follower));
-        context.pollUntilResponse();
-        context.assertSentRemoveVoterResponse(Errors.REQUEST_TIMED_OUT);
+        // check that there is a fetch to the new leader
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(fetchRequest, epoch + 1, 0L, 0);
+        assertEquals(voter2.id(), fetchRequest.destination().id());
     }
 
     private static void verifyVotersRecord(
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 04ff1ed29c2..8ce18ec9d44 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.ReplicaKey;
 import org.apache.kafka.raft.internals.VoterSet;
 import org.apache.kafka.raft.internals.VoterSetTest;
-import org.apache.kafka.server.common.Features;
 import org.apache.kafka.server.common.KRaftVersion;
 
 import org.junit.jupiter.api.Test;
@@ -76,7 +75,6 @@ public class LeaderStateTest {
             voters.voterIds(),
             accumulator,
             voters.listeners(localReplicaKey.id()),
-            Features.KRAFT_VERSION.supportedVersionRange(),
             fetchTimeoutMs,
             logContext
         );
@@ -122,7 +120,6 @@ public class LeaderStateTest {
                 Collections.emptySet(),
                 null,
                 Endpoints.empty(),
-                Features.KRAFT_VERSION.supportedVersionRange(),
                 fetchTimeoutMs,
                 logContext
             )
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index d7097d6b7f5..bbe9ddf8bf0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1166,6 +1166,30 @@ public final class RaftClientTestContext {
         return removeVoterResponse;
     }
 
+    RaftRequest.Outbound assertSentUpdateVoterRequest(
+        ReplicaKey replicaKey,
+        int epoch,
+        SupportedVersionRange supportedVersions,
+        Endpoints endpoints
+    ) {
+        List<RaftRequest.Outbound> sentRequests = 
channel.drainSentRequests(Optional.of(ApiKeys.UPDATE_RAFT_VOTER));
+        assertEquals(1, sentRequests.size());
+
+        RaftRequest.Outbound request = sentRequests.get(0);
+        assertInstanceOf(UpdateRaftVoterRequestData.class, request.data());
+
+        UpdateRaftVoterRequestData updateVoterRequest = 
(UpdateRaftVoterRequestData) request.data();
+        assertEquals(clusterId, updateVoterRequest.clusterId());
+        assertEquals(epoch, updateVoterRequest.currentLeaderEpoch());
+        assertEquals(replicaKey.id(), updateVoterRequest.voterId());
+        
assertEquals(replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID), 
updateVoterRequest.voterDirectoryId());
+        assertEquals(endpoints, 
Endpoints.fromUpdateVoterRequest(updateVoterRequest.listeners()));
+        assertEquals(supportedVersions.min(), 
updateVoterRequest.kRaftVersionFeature().minSupportedVersion());
+        assertEquals(supportedVersions.max(), 
updateVoterRequest.kRaftVersionFeature().maxSupportedVersion());
+
+        return request;
+    }
+
     UpdateRaftVoterResponseData assertSentUpdateVoterResponse(
         Errors error,
         OptionalInt leaderId,
@@ -1726,6 +1750,20 @@ public final class RaftClientTestContext {
         return RaftUtil.updateVoterRequest(clusterId, voter, epoch, 
supportedVersions, endpoints);
     }
 
+    UpdateRaftVoterResponseData updateVoterResponse(
+        Errors error,
+        LeaderAndEpoch leaderAndEpoch
+    ) {
+        return RaftUtil.updateVoterResponse(
+            error,
+            channel.listenerName(),
+            leaderAndEpoch,
+            leaderAndEpoch.leaderId().isPresent() ?
+                startingVoters.listeners(leaderAndEpoch.leaderId().getAsInt()) 
:
+                Endpoints.empty()
+        );
+    }
+
     private short fetchRpcVersion() {
         if (kip853Rpc) {
             return 17;

Reply via email to