This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch jsancio_KAFKA-16535
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2dbb6d7aa6622fe87b61fbb444fe075085870c3b
Author: Colin P. McCabe <[email protected]>
AuthorDate: Tue Jul 16 12:16:56 2024 -0700

    KAFKA-16535; Implement KRaft add voter handling
---
 .../common/errors/DuplicateVoterException.java     |  30 +
 .../org/apache/kafka/common/protocol/Errors.java   |   4 +-
 .../kafka/common/requests/ApiVersionsRequest.java  |   2 +-
 .../main/scala/kafka/server/ControllerApis.scala   |   2 +-
 .../main/java/org/apache/kafka/raft/Endpoints.java |  28 +
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 150 ++++-
 .../java/org/apache/kafka/raft/LeaderState.java    |  43 +-
 .../main/java/org/apache/kafka/raft/RaftUtil.java  |  35 ++
 .../kafka/raft/internals/AddVoterHandler.java      | 379 ++++++++++++
 .../kafka/raft/internals/AddVoterHandlerState.java |  92 +++
 .../kafka/raft/internals/BatchAccumulator.java     |  30 +-
 .../kafka/raft/internals/DefaultRequestSender.java | 110 ++++
 .../internals/KRaftControlRecordStateMachine.java  |  17 +-
 .../apache/kafka/raft/internals/RequestSender.java |  53 ++
 .../org/apache/kafka/raft/internals/VoterSet.java  |   8 +
 .../kafka/raft/internals/VoterSetHistory.java      |   7 +
 .../kafka/raft/KafkaRaftClientReconfigTest.java    | 663 ++++++++++++++++++++-
 .../apache/kafka/raft/RaftClientTestContext.java   |  80 ++-
 18 files changed, 1702 insertions(+), 31 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateVoterException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateVoterException.java
new file mode 100644
index 00000000000..11df6eaced2
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateVoterException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class DuplicateVoterException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public DuplicateVoterException(String message) {
+        super(message);
+    }
+
+    public DuplicateVoterException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ded58495e6d..9363d827267 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
 import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
 import org.apache.kafka.common.errors.DuplicateResourceException;
 import org.apache.kafka.common.errors.DuplicateSequenceException;
+import org.apache.kafka.common.errors.DuplicateVoterException;
 import org.apache.kafka.common.errors.ElectionNotNeededException;
 import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
 import org.apache.kafka.common.errors.FeatureUpdateFailedException;
@@ -405,7 +406,8 @@ public enum Errors {
     SHARE_SESSION_NOT_FOUND(122, "The share session was not found.", 
ShareSessionNotFoundException::new),
     INVALID_SHARE_SESSION_EPOCH(123, "The share session epoch is invalid.", 
InvalidShareSessionEpochException::new),
     FENCED_STATE_EPOCH(124, "The share coordinator rejected the request 
because the share-group state epoch did not match.", 
FencedStateEpochException::new),
-    INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving 
replica's key.", InvalidVoterKeyException::new);
+    INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving 
replica's key.", InvalidVoterKeyException::new),
+    DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", 
DuplicateVoterException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 6a1165cbecb..c286ceaf2d3 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -30,7 +30,7 @@ import java.util.regex.Pattern;
 public class ApiVersionsRequest extends AbstractRequest {
 
     public static class Builder extends 
AbstractRequest.Builder<ApiVersionsRequest> {
-        private static final String DEFAULT_CLIENT_SOFTWARE_NAME = 
"apache-kafka-java";
+        public static final String DEFAULT_CLIENT_SOFTWARE_NAME = 
"apache-kafka-java";
 
         private static final ApiVersionsRequestData DATA = new 
ApiVersionsRequestData()
             .setClientSoftwareName(DEFAULT_CLIENT_SOFTWARE_NAME)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 59d5d0bef10..1730bd9a4cc 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -1086,7 +1086,7 @@ class ControllerApis(
 
   def handleAddRaftVoter(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     authHelper.authorizeClusterOperation(request, ALTER)
-    throw new UnsupportedVersionException("handleAddRaftVoter is not supported 
yet.")
+    handleRaftRequest(request, response => new 
AddRaftVoterResponse(response.asInstanceOf[AddRaftVoterResponseData]))
   }
 
   def handleRemoveRaftVoter(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
diff --git a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java 
b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
index 0704e5ddf7b..41292252ae6 100644
--- a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
+++ b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.message.AddRaftVoterRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.EndQuorumEpochRequestData;
@@ -101,6 +102,21 @@ public final class Endpoints {
         return leaderEndpoints;
     }
 
+    public AddRaftVoterRequestData.ListenerCollection toAddVoterRequest() {
+        AddRaftVoterRequestData.ListenerCollection listeners =
+            new AddRaftVoterRequestData.ListenerCollection(endpoints.size());
+        for (Map.Entry<ListenerName, InetSocketAddress> entry : 
endpoints.entrySet()) {
+            listeners.add(
+                new AddRaftVoterRequestData.Listener()
+                    .setName(entry.getKey().value())
+                    .setHost(entry.getValue().getHostString())
+                    .setPort(entry.getValue().getPort())
+            );
+        }
+
+        return listeners;
+    }
+
     private static final Endpoints EMPTY = new 
Endpoints(Collections.emptyMap());
     public static Endpoints empty() {
         return EMPTY;
@@ -230,4 +246,16 @@ public final class Endpoints {
             )
             .orElse(Endpoints.empty());
     }
+
+    public static Endpoints 
fromAddVoterRequest(AddRaftVoterRequestData.ListenerCollection endpoints) {
+        Map<ListenerName, InetSocketAddress> listeners = new 
HashMap<>(endpoints.size());
+        for (AddRaftVoterRequestData.Listener endpoint : endpoints) {
+            listeners.put(
+                ListenerName.normalised(endpoint.name()),
+                InetSocketAddress.createUnresolved(endpoint.host(), 
endpoint.port())
+            );
+        }
+
+        return new Endpoints(listeners);
+    }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 17bf64f4666..a359b75e862 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -24,6 +24,9 @@ import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.AddRaftVoterRequestData;
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.DescribeQuorumRequestData;
@@ -57,10 +60,12 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.internals.AddVoterHandler;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.BatchMemoryPool;
 import org.apache.kafka.raft.internals.BlockingMessageQueue;
 import org.apache.kafka.raft.internals.CloseListener;
+import org.apache.kafka.raft.internals.DefaultRequestSender;
 import org.apache.kafka.raft.internals.FuturePurgatory;
 import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine;
 import org.apache.kafka.raft.internals.KafkaRaftMetrics;
@@ -196,6 +201,9 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     private volatile QuorumState quorum;
     private volatile RequestManager requestManager;
 
+    // Specialized handles
+    private volatile AddVoterHandler addVoterHandler;
+
     /**
      * Create a new instance.
      *
@@ -333,6 +341,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
 
+            // Notify the add voter handler that the HWM has been updated 
incase there are add
+            // voter request that need to be completed
+            addVoterHandler.highWatermarkUpdated(state);
+
             // After updating the high watermark, we first clear the append
             // purgatory so that we have an opportunity to route the pending
             // records still held in memory directly to the listener
@@ -351,8 +363,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 // Send snapshot to the listener, if the listener is at the 
beginning of the log and there is a snapshot,
                 // or the listener is trying to read an offset for which there 
isn't a segment in the log.
                 if (nextExpectedOffset < highWatermark &&
-                    ((nextExpectedOffset == 0 && latestSnapshot().isPresent()) 
||
-                     nextExpectedOffset < log.startOffset())
+                    ((nextExpectedOffset == 
ListenerContext.STARTING_NEXT_OFFSET || nextExpectedOffset < log.startOffset())
+                     && latestSnapshot().isPresent())
                 ) {
                     SnapshotReader<T> snapshot = 
latestSnapshot().orElseThrow(() -> new IllegalStateException(
                         String.format(
@@ -364,6 +376,20 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                         )
                     ));
                     listenerContext.fireHandleSnapshot(snapshot);
+                } else if (nextExpectedOffset == 
ListenerContext.STARTING_NEXT_OFFSET) {
+                    // Reset the next offset to 0 since it is a new listener 
context and there is no
+                    // bootstraping checkpoint
+                    listenerContext.resetOffsetForEmptyPartition();
+                } else if (nextExpectedOffset < log.startOffset()) {
+                    throw new IllegalStateException(
+                        String.format(
+                            "Snapshot expected since next offset of %s is %d, 
log start offset is %d and high-watermark is %d",
+                            listenerContext.listenerName(),
+                            nextExpectedOffset,
+                            log.startOffset(),
+                            highWatermark
+                        )
+                    );
                 }
             });
 
@@ -489,6 +515,19 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         if (quorum.isOnlyVoter() && !quorum.isCandidate()) {
             transitionToCandidate(currentTimeMs);
         }
+
+        // Specialized handlers
+        this.addVoterHandler = new AddVoterHandler(
+            partitionState,
+            new DefaultRequestSender(
+                requestManager,
+                channel,
+                messageQueue,
+                logContext
+            ),
+            time,
+            logContext
+        );
     }
 
     @Override
@@ -1970,6 +2009,80 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         return true;
     }
 
+    private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
+        RaftRequest.Inbound requestMetadata,
+        long currentTimeMs
+    ) {
+        AddRaftVoterRequestData data = (AddRaftVoterRequestData) 
requestMetadata.data();
+
+        if (!hasValidClusterId(data.clusterId())) {
+            return completedFuture(
+                new 
AddRaftVoterResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())
+            );
+        }
+
+        Optional<Errors> leaderValidation = 
validateLeaderOnlyRequest(quorum.epoch());
+        if (leaderValidation.isPresent()) {
+            return completedFuture(
+                new 
AddRaftVoterResponseData().setErrorCode(leaderValidation.get().code())
+            );
+        }
+
+        Optional<ReplicaKey> newVoter = RaftUtil.addVoterRequestVoterKey(data);
+        if (!newVoter.isPresent()) {
+            return completedFuture(
+                new AddRaftVoterResponseData()
+                    .setErrorCode(Errors.INVALID_REQUEST.code())
+                    .setErrorMessage("Add voter request didn't include a valid 
voter")
+            );
+        }
+
+        Endpoints newVoterEndpoints = 
Endpoints.fromAddVoterRequest(data.listeners());
+        if (!newVoterEndpoints.address(channel.listenerName()).isPresent()) {
+            return completedFuture(
+                new AddRaftVoterResponseData()
+                    .setErrorCode(Errors.INVALID_REQUEST.code())
+                    .setErrorMessage(
+                        String.format(
+                            "Add voter request didn't include the default 
listener: %s",
+                            channel.listenerName()
+                        )
+                    )
+            );
+        }
+
+        return addVoterHandler.handleAddVoterRequest(
+            quorum.leaderStateOrThrow(),
+            newVoter.get(),
+            newVoterEndpoints,
+            currentTimeMs
+        );
+    }
+
+    private boolean handleApiVersionsResponse(
+        RaftResponse.Inbound responseMetadata,
+        long currentTimeMs
+    ) {
+        if (!quorum.isLeader()) {
+            // Not the leader anymore just ignore the API_VERSIONS response
+            return true;
+        }
+
+        ApiVersionsResponseData response = (ApiVersionsResponseData) 
responseMetadata.data();
+
+        Errors error = Errors.forCode(response.errorCode());
+        Optional<ApiVersionsResponseData.SupportedFeatureKey> 
supportedKraftVersions =
+            
Optional.ofNullable(response.supportedFeatures().find(KRaftVersion.FEATURE_NAME));
+
+        return addVoterHandler.handleApiVersionsResponse(
+            quorum.leaderStateOrThrow(),
+            responseMetadata.source(),
+            error,
+            supportedKraftVersions,
+            currentTimeMs
+        );
+    }
+
     private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
         // Only elected leaders are sent in the request/response header, so if 
we have an elected
         // leaderId, it should be consistent with what is in the message.
@@ -2132,6 +2245,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 handledSuccessfully = handleFetchSnapshotResponse(response, 
currentTimeMs);
                 break;
 
+            case API_VERSIONS:
+                handledSuccessfully = handleApiVersionsResponse(response, 
currentTimeMs);
+                break;
+
             default:
                 throw new IllegalArgumentException("Received unexpected 
response type: " + apiKey);
         }
@@ -2224,6 +2341,10 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 responseFuture = 
completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
                 break;
 
+            case ADD_RAFT_VOTER:
+                responseFuture = handleAddVoterRequest(request, currentTimeMs);
+                break;
+
             default:
                 throw new IllegalArgumentException("Unexpected request type " 
+ apiKey);
         }
@@ -2567,6 +2688,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             return 0L;
         }
 
+        long timeUtilAddVoterExpires = 
addVoterHandler.maybeExpirePendingOperation(state, currentTimeMs);
+
         long timeUntilFlush = maybeAppendBatches(
             state,
             currentTimeMs
@@ -2577,7 +2700,16 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             currentTimeMs
         );
 
-        return Math.min(timeUntilFlush, Math.min(timeUntilNextBeginQuorumSend, 
timeUntilCheckQuorumExpires));
+        return Math.min(
+            timeUntilFlush,
+            Math.min(
+                timeUntilNextBeginQuorumSend,
+                Math.min(
+                    timeUntilCheckQuorumExpires,
+                    timeUtilAddVoterExpires
+                )
+            )
+        );
     }
 
     private long maybeSendVoteRequests(
@@ -3154,6 +3286,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     }
 
     private final class ListenerContext implements 
CloseListener<BatchReader<T>> {
+        private static final long STARTING_NEXT_OFFSET = -1;
         private final RaftClient.Listener<T> listener;
         // This field is used only by the Raft IO thread
         private LeaderAndEpoch lastFiredLeaderChange = LeaderAndEpoch.UNKNOWN;
@@ -3161,7 +3294,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         // These fields are visible to both the Raft IO thread and the listener
         // and are protected through synchronization on this ListenerContext 
instance
         private BatchReader<T> lastSent = null;
-        private long nextOffset = 0;
+        private long nextOffset = STARTING_NEXT_OFFSET;
 
         private ListenerContext(Listener<T> listener) {
             this.listener = listener;
@@ -3175,6 +3308,15 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             return nextOffset;
         }
 
+        /**
+         * Sets the nextOffset to zero.
+         *
+         * This is done when the partition is empty. No log and no 
bootstraping snapshot.
+         */
+        private synchronized void resetOffsetForEmptyPartition() {
+            nextOffset = 0;
+        }
+
         /**
          * Get the next expected offset, which might be larger than the last 
acked
          * offset if there are inflight batches which have not been acked yet.
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 9cdfbbf3d84..fc43713a748 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.internals.AddVoterHandlerState;
 import org.apache.kafka.raft.internals.BatchAccumulator;
 import org.apache.kafka.raft.internals.ReplicaKey;
 import org.apache.kafka.raft.internals.VoterSet;
@@ -71,6 +72,7 @@ public class LeaderState<T> implements EpochState {
 
     private Optional<LogOffsetMetadata> highWatermark = Optional.empty();
     private Map<Integer, ReplicaState> voterStates = new HashMap<>();
+    private Optional<AddVoterHandlerState> addVoterHandlerState = 
Optional.empty();
     private final Map<ReplicaKey, ReplicaState> observerStates = new 
HashMap<>();
     private final Logger log;
     private final BatchAccumulator<T> accumulator;
@@ -198,7 +200,24 @@ public class LeaderState<T> implements EpochState {
     }
 
     public BatchAccumulator<T> accumulator() {
-        return this.accumulator;
+        return accumulator;
+    }
+
+    public Optional<AddVoterHandlerState> addVoterHandlerState() {
+        return addVoterHandlerState;
+    }
+
+    public void resetAddVoterHandlerState(
+        Errors error,
+        String message,
+        Optional<AddVoterHandlerState> state
+    ) {
+        addVoterHandlerState.ifPresent(
+            handlerState -> handlerState
+                .future()
+                .complete(RaftUtil.addVoterResponse(error, message))
+        );
+        addVoterHandlerState = state;
     }
 
     private static List<Voter> convertToVoters(Set<Integer> voterIds) {
@@ -273,10 +292,29 @@ public class LeaderState<T> implements EpochState {
         accumulator.forceDrain();
     }
 
+    public long appendVotersRecord(VoterSet voters, long currentTimeMs) {
+        return accumulator.appendVotersRecord(
+            
voters.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION),
+            currentTimeMs
+        );
+    }
+
     public boolean isResignRequested() {
         return resignRequested;
     }
 
+    public boolean isReplicaCaughtUp(ReplicaKey replicaKey, long 
currentTimeMs) {
+        // In summary, let's consider a replica caughed up for add voter, if 
they
+        // have fetched within the last hour
+        return Optional.ofNullable(observerStates.get(replicaKey))
+            .map(state ->
+                state.lastCaughtUpTimestamp > 0 &&
+                state.lastFetchTimestamp > 0 &&
+                state.lastFetchTimestamp > currentTimeMs - 3600000
+            )
+            .orElse(false);
+    }
+
     public void requestResign() {
         this.resignRequested = true;
     }
@@ -510,7 +548,7 @@ public class LeaderState<T> implements EpochState {
         return state;
     }
 
-    private Optional<ReplicaState> getReplicaState(ReplicaKey replicaKey) {
+    public Optional<ReplicaState> getReplicaState(ReplicaKey replicaKey) {
         ReplicaState state = voterStates.get(replicaKey.id());
         if (state == null || !state.matchesKey(replicaKey)) {
             state = observerStates.get(replicaKey);
@@ -749,6 +787,7 @@ public class LeaderState<T> implements EpochState {
 
     @Override
     public void close() {
+        addVoterHandlerState.ifPresent(AddVoterHandlerState::close);
         accumulator.close();
     }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index c28864b2f2b..839f90c0819 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -18,6 +18,8 @@ package org.apache.kafka.raft;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.AddRaftVoterRequestData;
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.DescribeQuorumRequestData;
@@ -500,6 +502,31 @@ public class RaftUtil {
         return response;
     }
 
+    public static AddRaftVoterRequestData addVoterRequest(
+        String clusterId,
+        int timeoutMs,
+        ReplicaKey voter,
+        Endpoints listeners
+    ) {
+        return new AddRaftVoterRequestData()
+            .setClusterId(clusterId)
+            .setTimeoutMs(timeoutMs)
+            .setVoterId(voter.id())
+            
.setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID))
+            .setListeners(listeners.toAddVoterRequest());
+    }
+
+    public static AddRaftVoterResponseData addVoterResponse(
+        Errors error,
+        String errorMessage
+    ) {
+        errorMessage = errorMessage == null ? error.message() : errorMessage;
+
+        return new AddRaftVoterResponseData()
+            .setErrorCode(error.code())
+            .setErrorMessage(errorMessage);
+    }
+
     public static Optional<ReplicaKey> voteRequestVoterKey(
         VoteRequestData request,
         VoteRequestData.PartitionData partition
@@ -522,6 +549,14 @@ public class RaftUtil {
         }
     }
 
+    public static Optional<ReplicaKey> 
addVoterRequestVoterKey(AddRaftVoterRequestData request) {
+        if (request.voterId() < 0) {
+            return Optional.empty();
+        } else {
+            return Optional.of(ReplicaKey.of(request.voterId(), 
request.voterDirectoryId()));
+        }
+    }
+
     static boolean hasValidTopicPartition(FetchRequestData data, 
TopicPartition topicPartition, Uuid topicId) {
         return data.topics().size() == 1 &&
             data.topics().get(0).topicId().equals(topicId) &&
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
new file mode 100644
index 00000000000..d040e87ac03
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
+import org.apache.kafka.common.message.ApiVersionsRequestData;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.raft.Endpoints;
+import org.apache.kafka.raft.LeaderState;
+import org.apache.kafka.raft.LogOffsetMetadata;
+import org.apache.kafka.raft.RaftUtil;
+import org.apache.kafka.server.common.KRaftVersion;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This type implement the protocol for adding a voter to a KRaft partition.
+ *
+ * The general algorithm for adding a voter to the voter set is:
+ *
+ * 1. Wait until there are no uncommitted VotersRecord. Note that the 
implementation may just
+ *    return a REQUEST_TIMED_OUT error if there are pending operations.
+ * 2. Wait for the LeaderChangeMessage control record from the current epoch 
to get committed.
+ *    Note that the implementation may just return a REQUEST_TIMED_OUT error 
if there are pending
+ *    operations.
+ * 3. Send an ApiVersions RPC to the first listener to discover the supported 
kraft.version of the
+ *    new voter.
+ * 4. Check that the new voter supports the current kraft.version.
+ * 5. Wait for the fetch offset of the replica (ID, UUID) to catch up to the 
log end offset of
+ *    the leader.
+ * 6. Append the updated VotersRecord to the log.
+ * 7. The KRaft internal listener will read this record from the log and add 
the new voter to the
+ *    set of voters.
+ * 8. Wait for the VotersRecord to commit using the majority of the new set of 
voters. Return a
+ *    REQUEST_TIMED_OUT error if it doesn't succeed in time.
+ * 9. Send the AddVoter response to the client.
+ *
+ * The algorithm above could be improved as part of KAFKA-17147. Instead of 
returning an error
+ * immediately for 1. and 2., KRaft can wait with a timeout until those 
invariants become true.
+ */
+public final class AddVoterHandler {
+    private final KRaftControlRecordStateMachine partitionState;
+    private final RequestSender requestSender;
+    private final Time time;
+    private final Logger logger;
+
+    public AddVoterHandler(
+        KRaftControlRecordStateMachine partitionState,
+        RequestSender requestSender,
+        Time time,
+        LogContext logContext
+    ) {
+        this.partitionState = partitionState;
+        this.requestSender = requestSender;
+        this.time = time;
+        this.logger = logContext.logger(AddVoterHandler.class);
+    }
+
+    public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest(
+        LeaderState<?> leaderState,
+        ReplicaKey voterKey,
+        Endpoints voterEndpoints,
+        long currentTimeMs
+    ) {
+        // Check if there are any pending add voter requests
+        if (isOperationPending(leaderState, currentTimeMs)) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.addVoterResponse(
+                    Errors.REQUEST_TIMED_OUT,
+                    "Request timed out waiting for leader to handle previous 
add voter request"
+                )
+            );
+        }
+
+        Optional<LogHistory.Entry<VoterSet>> votersEntry = 
partitionState.lastVoterSetEntry();
+
+        // Check that the leader has established a HWM and committed the 
current epoch
+        Optional<Long> highWatermark = 
leaderState.highWatermark().map(LogOffsetMetadata::offset);
+        if (!highWatermark.isPresent()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.addVoterResponse(
+                    Errors.REQUEST_TIMED_OUT,
+                    "Request timed out waiting for leader to establish HWM and 
fence previous voter changes"
+                )
+            );
+        }
+
+        // Check that the cluster supports kraft.version >= 1
+        KRaftVersion kraftVersion = partitionState.lastKraftVersion();
+        if (!kraftVersion.isReconfigSupported()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.addVoterResponse(
+                    Errors.UNSUPPORTED_VERSION,
+                    String.format(
+                        "Cluster doesn't support adding voter because the %s 
feature is %s",
+                        kraftVersion.featureName(),
+                        kraftVersion.featureLevel()
+                    )
+                )
+            );
+        }
+
+        // Check that there are no uncommitted VotersRecord
+        if (!votersEntry.isPresent() || votersEntry.get().offset() >= 
highWatermark.get()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.addVoterResponse(
+                    Errors.REQUEST_TIMED_OUT,
+                    String.format(
+                        "Request timed out waiting for voters to commit the 
latest voter change at %s with HWM %d",
+                        votersEntry.map(LogHistory.Entry::offset),
+                        highWatermark.get()
+                    )
+                )
+            );
+        }
+
+        // Check that the new voter id is not part of the current voter set
+        VoterSet voters = votersEntry.get().value();
+        if (voters.voterIds().contains(voterKey.id())) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.addVoterResponse(
+                    Errors.DUPLICATE_VOTER,
+                    String.format(
+                        "The voter id for %s is already part of the set of 
voters %s.",
+                        voterKey,
+                        voters.voterKeys()
+                    )
+                )
+            );
+        }
+
+        // Send API_VERSIONS request to new voter to discover their supported 
kraft.version range
+        OptionalLong timeout = requestSender.send(
+            voterEndpoints
+                .address(requestSender.listenerName())
+                .map(address -> new Node(voterKey.id(), address.getHostName(), 
address.getPort()))
+                .orElseThrow(
+                    () -> new IllegalArgumentException(
+                        String.format(
+                            "Provided listeners %s do not contain a listener 
for %s",
+                            voterEndpoints,
+                            requestSender.listenerName()
+                        )
+                    )
+                ),
+            this::buildApiVersionsRequest,
+            currentTimeMs
+        );
+        if (!timeout.isPresent()) {
+            return CompletableFuture.completedFuture(
+                RaftUtil.addVoterResponse(
+                    Errors.REQUEST_TIMED_OUT,
+                    String.format("New voter %s is not ready to receive 
requests", voterKey)
+                )
+            );
+        }
+
+        AddVoterHandlerState state = new AddVoterHandlerState(
+            voterKey,
+            voterEndpoints,
+            time.timer(timeout.getAsLong())
+        );
+        leaderState.resetAddVoterHandlerState(
+            Errors.UNKNOWN_SERVER_ERROR,
+            null,
+            Optional.of(state)
+        );
+
+        return state.future();
+    }
+
+    public boolean handleApiVersionsResponse(
+        LeaderState<?> leaderState,
+        Node source,
+        Errors error,
+        Optional<ApiVersionsResponseData.SupportedFeatureKey> 
supportedKraftVersions,
+        long currentTimeMs
+    ) {
+        Optional<AddVoterHandlerState> handlerState = 
leaderState.addVoterHandlerState();
+        if (!handlerState.isPresent()) {
+            // There are no pending add operation just ignore the api response
+            return true;
+        }
+
+
+        // Check that the API_VERSIONS response matches the id of the voter 
getting added
+        AddVoterHandlerState current = handlerState.get();
+        if (!current.expectingApiResponse(source.id())) {
+            logger.info(
+                "API_VERSIONS response is not expected from {}: voterKey is 
{}, lastOffset is {}",
+                source,
+                current.voterKey(),
+                current.lastOffset()
+            );
+
+            return true;
+        }
+
+        // Abort operation if the API_VERSIONS returned an error
+        if (error != Errors.NONE) {
+            logger.info(
+                "Aborting add voter operation for {} at {} since API_VERSIONS 
returned an error {}",
+                current.voterKey(),
+                current.voterEndpoints(),
+                error
+            );
+
+            abortAddVoter(
+                leaderState,
+                Errors.REQUEST_TIMED_OUT,
+                String.format(
+                    "Aborted add voter operation for since API_VERSIONS 
returned an error %s",
+                    error
+                )
+            );
+
+            return false;
+        }
+
+        // Check that the new voter supports the kraft.verion for 
reconfiguration
+        if (!validVersionRange(supportedKraftVersions)) {
+            logger.info(
+                "Aborting add voter operation for {} at {} since kraft.version 
range {} doesn't " +
+                "support reconfiguration",
+                current.voterKey(),
+                current.voterEndpoints(),
+                supportedKraftVersions
+            );
+
+            abortAddVoter(
+                leaderState,
+                Errors.INVALID_REQUEST,
+                String.format(
+                    "Aborted add voter operation for %s since kraft.version 
range %s doesn't " +
+                    "support reconfiguration",
+                    current.voterKey(),
+                    supportedKraftVersions
+                        .map(
+                            range -> String.format(
+                                "(min: %s, max: %s",
+                                range.minVersion(),
+                                range.maxVersion()
+                            )
+                        )
+                        .orElse("(min: 0, max: 0)")
+                )
+            );
+
+            return true;
+        }
+
+        // Check that the new voter is caught up to the LEO to avoid delays in 
HWM increases
+        if (!leaderState.isReplicaCaughtUp(current.voterKey(), currentTimeMs)) 
{
+            logger.info(
+                "Aborting add voter operation for {} at {} since it is lagging 
behind: {}",
+                current.voterKey(),
+                current.voterEndpoints(),
+                leaderState.getReplicaState(current.voterKey())
+            );
+
+            abortAddVoter(
+                leaderState,
+                Errors.REQUEST_TIMED_OUT,
+                String.format(
+                    "Aborted add voter operation for %s since it is lagging 
behind",
+                    current.voterKey()
+                )
+            );
+
+            return true;
+        }
+
+        // Add the new voter to the set of voters and append the record to the 
log
+        VoterSet newVoters = partitionState
+            .lastVoterSet()
+            .addVoter(
+                VoterSet.VoterNode.of(
+                    current.voterKey(),
+                    current.voterEndpoints(),
+                    new SupportedVersionRange(
+                        supportedKraftVersions.get().minVersion(),
+                        supportedKraftVersions.get().maxVersion()
+                    )
+                )
+            )
+            .orElseThrow(() ->
+                new IllegalStateException(
+                    String.format(
+                        "Unable to add %s to the set of voters %s",
+                        current.voterKey(),
+                        partitionState.lastVoterSet()
+                    )
+                )
+            );
+        current.setLastOffset(leaderState.appendVotersRecord(newVoters, 
currentTimeMs));
+
+        return true;
+    }
+
+    public void highWatermarkUpdated(LeaderState<?> leaderState) {
+        leaderState.addVoterHandlerState().ifPresent(current -> {
+            leaderState.highWatermark().ifPresent(highWatermark -> {
+                current.lastOffset().ifPresent(lastOffset -> {
+                    if (highWatermark.offset() > lastOffset) {
+                        // VotersRecord with the added voter was committed; 
complete the RPC
+                        completeAddVoter(leaderState);
+                    }
+                });
+            });
+        });
+    }
+
+    public long maybeExpirePendingOperation(LeaderState<?> leaderState, long 
currentTimeMs) {
+        long timeUntilOperationExpiration = leaderState
+            .addVoterHandlerState()
+            .map(state -> state.timeUntilOperationExpiration(currentTimeMs))
+            .orElse(Long.MAX_VALUE);
+
+        if (timeUntilOperationExpiration == 0) {
+            abortAddVoter(leaderState, Errors.REQUEST_TIMED_OUT, null);
+            return Long.MAX_VALUE;
+        } else {
+            return timeUntilOperationExpiration;
+        }
+
+    }
+
+    private ApiVersionsRequestData buildApiVersionsRequest() {
+        return new ApiVersionsRequestData()
+            
.setClientSoftwareName(ApiVersionsRequest.Builder.DEFAULT_CLIENT_SOFTWARE_NAME)
+            .setClientSoftwareVersion(AppInfoParser.getVersion());
+    }
+
+    private boolean isOperationPending(LeaderState<?> leaderState, long 
currentTimeMs) {
+        maybeExpirePendingOperation(leaderState, currentTimeMs);
+        return leaderState.addVoterHandlerState().isPresent();
+    }
+
+    private boolean validVersionRange(
+        Optional<ApiVersionsResponseData.SupportedFeatureKey> 
supportedKraftVersions
+    ) {
+        return supportedKraftVersions.isPresent() &&
+            (supportedKraftVersions.get().minVersion() <= 1 &&
+             supportedKraftVersions.get().maxVersion() >= 1);
+    }
+
+    private void completeAddVoter(LeaderState<?> leaderState) {
+        leaderState.resetAddVoterHandlerState(Errors.NONE, null, 
Optional.empty());
+    }
+
+    private void abortAddVoter(LeaderState<?> leaderState, Errors error, 
String message) {
+        leaderState.resetAddVoterHandlerState(error, message, 
Optional.empty());
+    }
+}
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
new file mode 100644
index 00000000000..3b303f2a873
--- /dev/null
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.Endpoints;
+import org.apache.kafka.raft.RaftUtil;
+
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+
+public final class AddVoterHandlerState implements AutoCloseable {
+    private final ReplicaKey voterKey;
+    private final Endpoints voterEndpoints;
+    private final Timer timeout;
+    private final CompletableFuture<AddRaftVoterResponseData> future = new 
CompletableFuture<>();
+
+    private OptionalLong lastOffset = OptionalLong.empty();
+
+    AddVoterHandlerState(
+        ReplicaKey voterKey,
+        Endpoints voterEndpoints,
+        Timer timeout
+    ) {
+        this.voterKey = voterKey;
+        this.voterEndpoints = voterEndpoints;
+        this.timeout = timeout;
+    }
+
+    public long timeUntilOperationExpiration(long currentTimeMs) {
+        timeout.update(currentTimeMs);
+        return timeout.remainingMs();
+    }
+
+    public boolean expectingApiResponse(int replicaId) {
+        return !lastOffset.isPresent() && replicaId == voterKey.id();
+    }
+
+    public void setLastOffset(long lastOffset) {
+        if (this.lastOffset.isPresent()) {
+            throw new IllegalStateException(
+                    String.format(
+                        "Cannot override last offset to %s for adding voter %s 
because it is " +
+                        "already set to %s",
+                        lastOffset,
+                        voterKey,
+                        this.lastOffset
+                        )
+                    );
+        }
+
+        this.lastOffset = OptionalLong.of(lastOffset);
+    }
+
+    public ReplicaKey voterKey() {
+        return voterKey;
+    }
+
+    public Endpoints voterEndpoints() {
+        return voterEndpoints;
+    }
+
+    public OptionalLong lastOffset() {
+        return lastOffset;
+    }
+
+    public CompletableFuture<AddRaftVoterResponseData> future() {
+        return future;
+    }
+
+    @Override
+    public void close() {
+        
future.complete(RaftUtil.addVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, null));
+    }
+}
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 09204812ae6..ff4de9b4a56 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.SnapshotFooterRecord;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.VotersRecord;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MutableRecordBatch;
@@ -225,8 +226,9 @@ public class BatchAccumulator<T> implements Closeable {
      * @param valueCreator a function that uses the passed buffer to create 
the control
      *        batch that will be appended. The memory records returned must 
contain one
      *        control batch and that control batch have at least one record.
+     * @return the last of offset of the records created
      */
-    public void appendControlMessages(MemoryRecordsCreator valueCreator) {
+    public long appendControlMessages(MemoryRecordsCreator valueCreator) {
         appendLock.lock();
         try {
             ByteBuffer buffer = memoryPool.tryAllocate(maxBatchSize);
@@ -260,6 +262,8 @@ public class BatchAccumulator<T> implements Closeable {
             } else {
                 throw new IllegalStateException("Could not allocate buffer for 
the control record");
             }
+
+            return nextOffset - 1;
         } finally {
             appendLock.unlock();
         }
@@ -303,6 +307,30 @@ public class BatchAccumulator<T> implements Closeable {
         return numberOfRecords;
     }
 
+    /**
+     * Append a {@link VotersRecord} record to the batch
+     *
+     * @param voters the record to append
+     * @param currentTimestamp the current time in milliseconds
+     * @return the last of offset of the records created
+     * @throws IllegalStateException on failure to allocate a buffer for the 
record
+     */
+    public long appendVotersRecord(
+        VotersRecord voters,
+        long currentTimestamp
+    ) {
+        return appendControlMessages((baseOffset, epoch, compression, buffer) 
->
+            MemoryRecords.withVotersRecord(
+                baseOffset,
+                currentTimestamp,
+                epoch,
+                buffer,
+                voters
+            )
+        );
+    }
+
+
     /**
      * Append a {@link LeaderChangeMessage} record to the batch
      *
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java
new file mode 100644
index 00000000000..0cee3c255d1
--- /dev/null
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.raft.NetworkChannel;
+import org.apache.kafka.raft.RaftMessageQueue;
+import org.apache.kafka.raft.RaftRequest;
+import org.apache.kafka.raft.RaftResponse;
+import org.apache.kafka.raft.RaftUtil;
+import org.apache.kafka.raft.RequestManager;
+
+import org.slf4j.Logger;
+
+import java.util.OptionalLong;
+import java.util.function.Supplier;
+
+public final class DefaultRequestSender  implements RequestSender {
+    private final RequestManager requestManager;
+    private final NetworkChannel channel;
+    private final RaftMessageQueue messageQueue;
+    private final Logger logger;
+
+    public DefaultRequestSender(
+        RequestManager requestManager,
+        NetworkChannel channel,
+        RaftMessageQueue messageQueue,
+        LogContext logContext
+    ) {
+        this.requestManager = requestManager;
+        this.channel = channel;
+        this.messageQueue = messageQueue;
+        this.logger = logContext.logger(DefaultRequestSender.class);
+    }
+
+    @Override
+    public ListenerName listenerName() {
+        return channel.listenerName();
+    }
+
+    @Override
+    public OptionalLong send(
+        Node destination,
+        Supplier<ApiMessage> requestSupplier,
+        long currentTimeMs
+    ) {
+        if (requestManager.isBackingOff(destination, currentTimeMs)) {
+            long remainingBackoffMs = 
requestManager.remainingBackoffMs(destination, currentTimeMs);
+            logger.debug("Connection for {} is backing off for {} ms", 
destination, remainingBackoffMs);
+            return OptionalLong.empty();
+        }
+
+        if (!requestManager.isReady(destination, currentTimeMs)) {
+            long remainingMs = 
requestManager.remainingRequestTimeMs(destination, currentTimeMs);
+            logger.debug("Connection for {} has a pending request for {} ms", 
destination, remainingMs);
+            return OptionalLong.empty();
+        }
+
+        int correlationId = channel.newCorrelationId();
+        ApiMessage request = requestSupplier.get();
+
+        RaftRequest.Outbound requestMessage = new RaftRequest.Outbound(
+            correlationId,
+            request,
+            destination,
+            currentTimeMs
+        );
+
+        requestMessage.completion.whenComplete((response, exception) -> {
+            if (exception != null) {
+                ApiKeys api = ApiKeys.forId(request.apiKey());
+                Errors error = Errors.forException(exception);
+                ApiMessage errorResponse = RaftUtil.errorResponse(api, error);
+
+                response = new RaftResponse.Inbound(
+                    correlationId,
+                    errorResponse,
+                    destination
+                );
+            }
+
+            messageQueue.add(response);
+        });
+
+        requestManager.onRequestSent(destination, correlationId, 
currentTimeMs);
+        channel.send(requestMessage);
+        logger.trace("Sent outbound request: {}", requestMessage);
+
+        return 
OptionalLong.of(requestManager.remainingRequestTimeMs(destination, 
currentTimeMs));
+    }
+}
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index 59663b54204..6c84935b23f 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -65,7 +65,7 @@ public final class KRaftControlRecordStateMachine {
     //
     // 2. The read operations lastVoterSet, voterSetAtOffset and 
kraftVersionAtOffset read
     // the nextOffset first before reading voterSetHistory or 
kraftVersionHistory
-    private volatile long nextOffset = 0;
+    private volatile long nextOffset = -1;
 
     /**
      * Constructs an internal log listener
@@ -138,6 +138,15 @@ public final class KRaftControlRecordStateMachine {
         }
     }
 
+    /**
+     * Return the latest entry for the set of voters.
+     */
+    public Optional<LogHistory.Entry<VoterSet>> lastVoterSetEntry() {
+        synchronized (voterSetHistory) {
+            return voterSetHistory.lastEntry();
+        }
+    }
+
     /**
      * Returns the offset of the last voter set.
      */
@@ -221,7 +230,7 @@ public final class KRaftControlRecordStateMachine {
     }
 
     private void maybeLoadSnapshot() {
-        if ((nextOffset == 0 || nextOffset < log.startOffset()) && 
log.latestSnapshot().isPresent()) {
+        if ((nextOffset == -1 || nextOffset < log.startOffset()) && 
log.latestSnapshot().isPresent()) {
             RawSnapshotReader rawSnapshot = log.latestSnapshot().get();
             // Clear the current state
             synchronized (kraftVersionHistory) {
@@ -254,6 +263,10 @@ public final class KRaftControlRecordStateMachine {
 
                 nextOffset = reader.lastContainedLogOffset() + 1;
             }
+        } else if (nextOffset == -1) {
+            // Listener just started and there are no snapshots; set the 
nextOffset to
+            // 0 to start reading the log
+            nextOffset = 0;
         }
     }
 
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RequestSender.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RequestSender.java
new file mode 100644
index 00000000000..d30b259c7d8
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RequestSender.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.ApiMessage;
+
+import java.util.OptionalLong;
+import java.util.function.Supplier;
+
+/**
+ * Interface for sending KRaft requests.
+ *
+ * Responsible for managing the connection state and sending request when the 
connection is
+ * available.
+ */
+interface RequestSender {
+    /**
+     * The name of the listener used for sending request.
+     *
+     * This is generally the default (first) listener.
+     */
+    ListenerName listenerName();
+
+    /**
+     * Send a KRaft request to the destination.
+     *
+     * @param destination the destination for the request
+     * @param requestSupplier the default constructor for the request
+     * @param currentTimeMs the current time
+     * @return the request timeout if the request was sent; otherwise {@code 
Optional.empty()}
+     */
+    OptionalLong send(
+        Node destination,
+        Supplier<ApiMessage> requestSupplier,
+        long currentTimeMs
+    );
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index 2f961897982..d5de736f7b4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -363,6 +363,14 @@ public final class VoterSet {
                 supportedKRaftVersion
             );
         }
+
+        public static VoterNode of(
+            ReplicaKey voterKey,
+            Endpoints listeners,
+            SupportedVersionRange supportedKRaftVersion
+        ) {
+            return new VoterNode(voterKey, listeners, supportedKRaftVersion);
+        }
     }
 
     /**
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
index 1b630c3e22c..4732d6e799f 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
@@ -92,6 +92,13 @@ public final class VoterSetHistory {
             .orElseThrow(() -> new IllegalStateException("No voter set 
found"));
     }
 
+    /**
+     * Return the latest entry for the set of voters.
+     */
+    public Optional<LogHistory.Entry<VoterSet>> lastEntry() {
+        return votersHistory.lastEntry();
+    }
+
     /**
      * Returns the offset of the last voter set stored in the partition 
history.
      *
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 84a9af918b6..5cdcfe5132c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.KRaftVersionRecord;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.SnapshotFooterRecord;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.message.VotersRecord;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.ControlRecordUtils;
@@ -35,18 +39,22 @@ import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.raft.internals.ReplicaKey;
 import org.apache.kafka.raft.internals.VoterSet;
 import org.apache.kafka.raft.internals.VoterSetTest;
+import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.snapshot.RecordsSnapshotReader;
 import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
 
 import org.junit.jupiter.api.Test;
 
+import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Stream;
 
 import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
@@ -55,12 +63,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+// KAFKA-16537 will add tests for testAddVoterWithPendingRemoveVoter
+// and testRemoveVoterWithPendingAddVoter
 public class KafkaRaftClientReconfigTest {
 
     @Test
     public void testLeaderWritesBootstrapRecords() throws Exception {
-        ReplicaKey local = replicaKey(0, true);
-        ReplicaKey follower = replicaKey(1, true);
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
 
         VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
 
@@ -134,8 +144,8 @@ public class KafkaRaftClientReconfigTest {
 
     @Test
     public void testBootstrapCheckpointIsNotReturnedOnFetch() throws Exception 
{
-        ReplicaKey local = replicaKey(0, true);
-        ReplicaKey follower = replicaKey(1, true);
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
 
         VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
 
@@ -164,13 +174,13 @@ public class KafkaRaftClientReconfigTest {
 
     @Test
     public void testLeaderDoesNotBootstrapRecordsWithKraftVersion0() throws 
Exception {
-        ReplicaKey local = replicaKey(0, true);
-        ReplicaKey follower = replicaKey(1, true);
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
 
         VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
-            .withStaticVoters(voters.voterIds())
+            .withStaticVoters(voters)
             .withBootstrapSnapshot(Optional.empty())
             .withUnknownLeader(0)
             .build();
@@ -226,8 +236,8 @@ public class KafkaRaftClientReconfigTest {
 
     @Test
     public void testFollowerDoesNotRequestLeaderBootstrapSnapshot() throws 
Exception {
-        ReplicaKey local = replicaKey(0, true);
-        ReplicaKey leader = replicaKey(1, true);
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey leader = replicaKey(local.id() + 1, true);
         int epoch = 1;
 
         VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader));
@@ -256,9 +266,9 @@ public class KafkaRaftClientReconfigTest {
 
     @Test
     public void testFollowerReadsKRaftBootstrapRecords() throws Exception {
-        ReplicaKey local = replicaKey(0, true);
-        ReplicaKey leader = replicaKey(1, true);
-        ReplicaKey follower = replicaKey(2, true);
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey leader = replicaKey(local.id() + 1, true);
+        ReplicaKey follower = replicaKey(local.id() + 2, true);
         VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, leader));
         int epoch = 5;
 
@@ -324,6 +334,609 @@ public class KafkaRaftClientReconfigTest {
         assertTrue(context.client.quorum().isVoter(follower));
     }
 
+    @Test
+    public void testAddVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Show that the new voter is not currently a voter
+        assertFalse(context.client.quorum().isVoter(newVoter));
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+
+        // Leader should send an API_VERSIONS request to the new voter's 
endpoint
+        context.pollUntilRequest();
+        RaftRequest.Outbound apiVersionRequest = 
context.assertSentApiVersionsRequest();
+        assertEquals(
+            new Node(newVoter.id(), newAddress.getHostString(), 
newAddress.getPort()),
+            apiVersionRequest.destination()
+        );
+
+        // Reply with API_VERSIONS response with supported kraft.version
+        context.deliverResponse(
+            apiVersionRequest.correlationId(),
+            apiVersionRequest.destination(),
+            apiVersionsResponse(Errors.NONE)
+        );
+
+        // Handle the the API_VERSIONS response
+        context.client.poll();
+        // Append new VotersRecord to log
+        context.client.poll();
+        // The new voter is now a voter after writing the VotersRecord to the 
log
+        assertTrue(context.client.quorum().isVoter(newVoter));
+
+        // Send a FETCH to increase the HWM and commit the new voter set
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Expect reply for AddVoter request
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.NONE);
+    }
+
+    @Test
+    void testAddVoterInvalidClusterId() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // empty cluster id is rejected
+        context.deliverRequest(context.addVoterRequest("", Integer.MAX_VALUE, 
newVoter, newListeners));
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.INCONSISTENT_CLUSTER_ID);
+
+        // invalid cluster id is rejected
+        context.deliverRequest(context.addVoterRequest("invalid-uuid", 
Integer.MAX_VALUE, newVoter, newListeners));
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.INCONSISTENT_CLUSTER_ID);
+    }
+
+    @Test
+    void testAddVoterToNotLeader() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER);
+    }
+
+    @Test
+    void testAddVoterWithMissingDefaultListener() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            
Collections.singletonMap(ListenerName.normalised("not_the_default_listener"), 
newAddress)
+        );
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.INVALID_REQUEST);
+    }
+
+    @Test
+    void testAddVoterWithPendingAddVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+
+        // Attempting to add another voter should be an error
+        ReplicaKey anotherNewVoter = replicaKey(local.id() + 3, true);
+        InetSocketAddress anotherNewAddress = 
InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + anotherNewVoter.id()
+        );
+        Endpoints anotherNewListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
anotherNewAddress)
+        );
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
anotherNewVoter, anotherNewListeners));
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
+    }
+
+    @Test
+    void testAddVoterWithoutFencedPreviousLeaders() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
+    }
+
+    @Test
+    void testAddVoterWithKraftVersion0() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withStaticVoters(voters)
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.UNSUPPORTED_VERSION);
+    }
+
+    @Test
+    void testAddVoterWithExistingVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(follower.id(), true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter with the same id as an existing voter
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.DUPLICATE_VOTER);
+    }
+
+    @Test
+    void testAddVoterTimeout() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Show that the new voter is not currently a voter
+        assertFalse(context.client.quorum().isVoter(newVoter));
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+
+        // Leader should send an API_VERSIONS request to the new voter's 
endpoint
+        context.pollUntilRequest();
+        RaftRequest.Outbound apiVersionRequest = 
context.assertSentApiVersionsRequest();
+        assertEquals(
+            new Node(newVoter.id(), newAddress.getHostString(), 
newAddress.getPort()),
+            apiVersionRequest.destination()
+        );
+
+        // Reply with API_VERSIONS response with supported kraft.version
+        context.deliverResponse(
+            apiVersionRequest.correlationId(),
+            apiVersionRequest.destination(),
+            apiVersionsResponse(Errors.NONE)
+        );
+
+        // Handle the the API_VERSIONS response
+        context.client.poll();
+        // Append new VotersRecord to log
+        context.client.poll();
+        // The new voter is now a voter after writing the VotersRecord to the 
log
+        assertTrue(context.client.quorum().isVoter(newVoter));
+
+        context.time.sleep(context.requestTimeoutMs());
+
+        // Expect the AddVoter RPC to timeout
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
+    }
+
+    @Test
+    void testAddVoterWithApiVersionsFromIncorrectNode() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+
+        // Leader should send an API_VERSIONS request to the new voter's 
endpoint
+        context.pollUntilRequest();
+        RaftRequest.Outbound apiVersionRequest = 
context.assertSentApiVersionsRequest();
+        assertEquals(
+            new Node(newVoter.id(), newAddress.getHostString(), 
newAddress.getPort()),
+            apiVersionRequest.destination()
+        );
+
+        // Reply with API_VERSIONS response with supported kraft.version
+        context.deliverResponse(
+            apiVersionRequest.correlationId(),
+            apiVersionRequest.destination(),
+            apiVersionsResponse(Errors.INVALID_REQUEST)
+        );
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
+    }
+
+    @Test
+    void testAddVoterInvalidFeatureVersion() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Catch up the new voter to the leader's LEO
+        context.deliverRequest(
+            context.fetchRequest(epoch, newVoter, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+
+        // Leader should send an API_VERSIONS request to the new voter's 
endpoint
+        context.pollUntilRequest();
+        RaftRequest.Outbound apiVersionRequest = 
context.assertSentApiVersionsRequest();
+        assertEquals(
+            new Node(newVoter.id(), newAddress.getHostString(), 
newAddress.getPort()),
+            apiVersionRequest.destination()
+        );
+
+        // Reply with API_VERSIONS response with supported kraft.version
+        context.deliverResponse(
+            apiVersionRequest.correlationId(),
+            apiVersionRequest.destination(),
+            apiVersionsResponse(Errors.NONE, new SupportedVersionRange((short) 
0))
+        );
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.INVALID_REQUEST);
+    }
+
+    @Test
+    void testAddVoterWithLaggingNewVoter() throws Exception {
+        ReplicaKey local = replicaKey(randomeReplicaId(), true);
+        ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+            .withKip853Rpc(true)
+            .withBootstrapSnapshot(Optional.of(voters))
+            .withUnknownLeader(3)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+        InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+            "localhost",
+            9990 + newVoter.id()
+        );
+        Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+            Collections.singletonMap(context.channel.listenerName(), 
newAddress)
+        );
+
+        // Establish a HWM and fence previous leaders
+        context.deliverRequest(
+            context.fetchRequest(epoch, follower, 
context.log.endOffset().offset(), epoch, 0)
+        );
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
+
+        // Attempt to add new voter to the quorum
+        context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, 
newVoter, newListeners));
+
+        // Leader should send an API_VERSIONS request to the new voter's 
endpoint
+        context.pollUntilRequest();
+        RaftRequest.Outbound apiVersionRequest = 
context.assertSentApiVersionsRequest();
+        assertEquals(
+            new Node(newVoter.id(), newAddress.getHostString(), 
newAddress.getPort()),
+            apiVersionRequest.destination()
+        );
+
+        // Reply with API_VERSIONS response with supported kraft.version
+        context.deliverResponse(
+            apiVersionRequest.correlationId(),
+            apiVersionRequest.destination(),
+            apiVersionsResponse(Errors.NONE)
+        );
+        context.pollUntilResponse();
+        context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT);
+    }
+
     private static void verifyVotersRecord(
         VoterSet expectedVoterSet,
         ByteBuffer recordKey,
@@ -346,4 +959,30 @@ public class KafkaRaftClientReconfigTest {
         KRaftVersionRecord kRaftVersionRecord = 
ControlRecordUtils.deserializeKRaftVersionRecord(recordValue);
         assertEquals(expectedKRaftVersion, kRaftVersionRecord.kRaftVersion());
     }
+
+    private int randomeReplicaId() {
+        return ThreadLocalRandom.current().nextInt(1025);
+    }
+
+    private static ApiVersionsResponseData apiVersionsResponse(Errors error) {
+        return apiVersionsResponse(error, new SupportedVersionRange((short) 0, 
(short) 1));
+    }
+
+    private static ApiVersionsResponseData apiVersionsResponse(Errors error, 
SupportedVersionRange supportedVersions) {
+        ApiVersionsResponseData.SupportedFeatureKeyCollection 
supportedFeatures =
+            new ApiVersionsResponseData.SupportedFeatureKeyCollection(1);
+
+        if (supportedVersions.max() > 0) {
+            supportedFeatures.add(
+                new ApiVersionsResponseData.SupportedFeatureKey()
+                    .setName(KRaftVersion.FEATURE_NAME)
+                    .setMinVersion(supportedVersions.min())
+                    .setMaxVersion(supportedVersions.max())
+            );
+        }
+
+        return new ApiVersionsResponseData()
+            .setErrorCode(error.code())
+            .setSupportedFeatures(supportedFeatures);
+    }
 }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 60147c07888..0f8f01876e1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -21,6 +21,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
 import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.AddRaftVoterRequestData;
+import org.apache.kafka.common.message.AddRaftVoterResponseData;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
 import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
 import org.apache.kafka.common.message.DescribeQuorumRequestData;
@@ -300,12 +303,16 @@ public final class RaftClientTestContext {
                     Collectors.toMap(Function.identity(), 
RaftClientTestContext::mockAddress)
                 );
 
-            this.startingVoters = Optional.of(
+            return withStaticVoters(
                 VoterSet.fromInetSocketAddresses(
                     MockNetworkChannel.LISTENER_NAME,
                     staticVoterAddressMap
                 )
             );
+        }
+
+        Builder withStaticVoters(VoterSet staticVoters) {
+            this.startingVoters = Optional.of(staticVoters);
             this.isStartingVotersStatic = true;
 
             return this;
@@ -1081,6 +1088,26 @@ public final class RaftClientTestContext {
         return result;
     }
 
+    RaftRequest.Outbound assertSentApiVersionsRequest() {
+        List<RaftRequest.Outbound> sentRequests = 
channel.drainSentRequests(Optional.of(ApiKeys.API_VERSIONS));
+        assertEquals(1, sentRequests.size());
+
+        return sentRequests.get(0);
+    }
+
+    AddRaftVoterResponseData assertSentAddVoterResponse(Errors error) {
+        List<RaftResponse.Outbound> sentResponses = 
drainSentResponses(ApiKeys.ADD_RAFT_VOTER);
+        assertEquals(1, sentResponses.size());
+
+        RaftResponse.Outbound response = sentResponses.get(0);
+        assertInstanceOf(AddRaftVoterResponseData.class, response.data());
+
+        AddRaftVoterResponseData addVoterResponse = (AddRaftVoterResponseData) 
response.data();
+        assertEquals(error, Errors.forCode(addVoterResponse.errorCode()));
+
+        return addVoterResponse;
+    }
+
     List<RaftRequest.Outbound> collectEndQuorumRequests(
         int epoch,
         Set<Integer> destinationIdSet,
@@ -1339,12 +1366,8 @@ public final class RaftClientTestContext {
             voters
                 .stream()
                 .map(voterId -> new Voter().setVoterId(voterId))
-                .collect(Collectors.toList()),
-            leaderChangeMessage
-                    .voters()
-                    .stream()
-                    .sorted(Comparator.comparingInt(Voter::voterId))
-                    .collect(Collectors.toList())
+                .collect(Collectors.toSet()),
+            new HashSet<>(leaderChangeMessage.voters())
         );
         assertEquals(
             grantingVoters
@@ -1545,6 +1568,34 @@ public final class RaftClientTestContext {
         return RaftUtil.singletonDescribeQuorumRequest(metadataPartition);
     }
 
+    AddRaftVoterRequestData addVoterRequest(
+        int timeoutMs,
+        ReplicaKey voter,
+        Endpoints endpoints
+    ) {
+        return addVoterRequest(
+            clusterId.toString(),
+            timeoutMs,
+            voter,
+            endpoints
+        );
+    }
+
+    AddRaftVoterRequestData addVoterRequest(
+        String clusterId,
+        int timeoutMs,
+        ReplicaKey voter,
+        Endpoints endpoints
+    ) {
+        return RaftUtil.addVoterRequest(
+            clusterId,
+            timeoutMs,
+            voter,
+            endpoints
+        );
+    }
+
+
     private short fetchRpcVersion() {
         if (kip853Rpc) {
             return 17;
@@ -1593,6 +1644,15 @@ public final class RaftClientTestContext {
         }
     }
 
+    private short addVoterRpcVersion() {
+        if (kip853Rpc) {
+            return 0;
+        } else {
+            throw new IllegalStateException("Reconfiguration must be enabled 
by calling withKip853Rpc(true)");
+        }
+    }
+
+
     private short raftRequestVersion(ApiMessage request) {
         if (request instanceof FetchRequestData) {
             return fetchRpcVersion();
@@ -1606,6 +1666,8 @@ public final class RaftClientTestContext {
             return endQuorumEpochRpcVersion();
         } else if (request instanceof DescribeQuorumRequestData) {
             return describeQuorumRpcVersion();
+        } else if (request instanceof AddRaftVoterRequestData) {
+            return addVoterRpcVersion();
         } else {
             throw new IllegalArgumentException(String.format("Request %s is 
not a raft request", request));
         }
@@ -1624,6 +1686,10 @@ public final class RaftClientTestContext {
             return endQuorumEpochRpcVersion();
         } else if (response instanceof DescribeQuorumResponseData) {
             return describeQuorumRpcVersion();
+        } else if (response instanceof AddRaftVoterResponseData) {
+            return addVoterRpcVersion();
+        } else if (response instanceof ApiVersionsResponseData) {
+            return 4;
         } else {
             throw new IllegalArgumentException(String.format("Request %s is 
not a raft response", response));
         }


Reply via email to