This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b73e31eb159 KAFKA-17641; Update Vote RPC with PreVote field (#17807)
b73e31eb159 is described below

commit b73e31eb1597bc55d59096b28a5a3e1b4d2ecc9a
Author: Alyssa Huang <[email protected]>
AuthorDate: Fri Dec 13 13:24:30 2024 -0800

    KAFKA-17641; Update Vote RPC with PreVote field (#17807)
    
    Introduces v2 of Vote RPC and implements the handling of the new version of 
the RPC.
    
    Many references to "candidate" in the Vote RPC are changed to the more 
generic "replica". Replicas sending Vote request with PreVote set to true are 
not candidate. They are instead prospective candidate that are attempting to 
become candidate.
    
    Replicas receiving PreVote requests (vote request with PreVote=true) with 
an epoch equal to their own will _not_ transition to Unattached state. They 
will only grant the vote if they have not recently fetched from leader and the 
request's last epoch and offset are up-to-date with theirs.
    
    If a replica receives a PreVote request with an epoch greater than their 
current epoch, they will transition to Unattached state (setting their epoch to 
the one from the pre-vote request) and then grant the vote if the request's 
last epoch and offset are up-to-date with theirs.
    
    To avoid a possible ping-pong scenario. For example, there is 3 node 
quorum, leader node A disconnects from quorum, node B goes into prospective 
state first before node C, node B sends pre-vote request to node C still in 
follower state and receives back that node A is leader, node B transitions to 
follower while node C transitions to prospective after election timeout. If you 
repeat this interaction, it is possible for such replicas to transition from 
Follower to Prospective in perpet [...]
    
    This change introduces a new suite called KafkaRaftClientPreVoteTest, for 
additional KRaft protocol tests with respect to pre-vote.
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../apache/kafka/common/requests/VoteRequest.java  |   8 +-
 .../main/resources/common/message/VoteRequest.json |  23 +-
 .../resources/common/message/VoteResponse.json     |   3 +-
 .../kafka/common/requests/RequestResponseTest.java |  33 +-
 .../java/org/apache/kafka/raft/CandidateState.java |  19 +-
 .../java/org/apache/kafka/raft/EpochState.java     |  11 +-
 .../java/org/apache/kafka/raft/FollowerState.java  |  23 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  65 ++-
 .../java/org/apache/kafka/raft/LeaderState.java    |   7 +-
 .../java/org/apache/kafka/raft/QuorumState.java    |  10 +-
 .../main/java/org/apache/kafka/raft/RaftUtil.java  |  29 +-
 .../java/org/apache/kafka/raft/ResignedState.java  |  14 +-
 .../org/apache/kafka/raft/UnattachedState.java     |  33 +-
 .../org/apache/kafka/raft/CandidateStateTest.java  |  13 +-
 .../org/apache/kafka/raft/FollowerStateTest.java   |  24 +-
 .../kafka/raft/KafkaRaftClientPreVoteTest.java     | 576 +++++++++++++++++++++
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  14 +-
 .../org/apache/kafka/raft/LeaderStateTest.java     |  16 +-
 .../apache/kafka/raft/RaftClientTestContext.java   |  47 +-
 .../java/org/apache/kafka/raft/RaftUtilTest.java   |  44 +-
 .../org/apache/kafka/raft/ResignedStateTest.java   |  19 +-
 .../org/apache/kafka/raft/UnattachedStateTest.java |  42 +-
 .../kafka/raft/UnattachedStateWithVoteTest.java    |  66 ++-
 23 files changed, 964 insertions(+), 175 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
index 531c33b5f83..4acf8d31ca5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
@@ -71,8 +71,8 @@ public class VoteRequest extends AbstractRequest {
 
     public static VoteRequestData singletonRequest(TopicPartition 
topicPartition,
                                                    String clusterId,
-                                                   int candidateEpoch,
-                                                   int candidateId,
+                                                   int replicaEpoch,
+                                                   int replicaId,
                                                    int lastEpoch,
                                                    long lastEpochEndOffset) {
         return new VoteRequestData()
@@ -83,8 +83,8 @@ public class VoteRequest extends AbstractRequest {
                            .setPartitions(Collections.singletonList(
                                new VoteRequestData.PartitionData()
                                    
.setPartitionIndex(topicPartition.partition())
-                                   .setCandidateEpoch(candidateEpoch)
-                                   .setCandidateId(candidateId)
+                                   .setReplicaEpoch(replicaEpoch)
+                                   .setReplicaId(replicaId)
                                    .setLastOffsetEpoch(lastEpoch)
                                    .setLastOffset(lastEpochEndOffset))
                            )));
diff --git a/clients/src/main/resources/common/message/VoteRequest.json 
b/clients/src/main/resources/common/message/VoteRequest.json
index 6a2e58a3515..80cb580d199 100644
--- a/clients/src/main/resources/common/message/VoteRequest.json
+++ b/clients/src/main/resources/common/message/VoteRequest.json
@@ -18,8 +18,9 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "VoteRequest",
-  // Version 1 adds voter key and candidate directory id (KIP-853)
-  "validVersions": "0-1",
+  // Version 1 adds voter key and directory id (KIP-853)
+  // Version 2 adds PreVote field and renames candidate to replica
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "0+",
@@ -35,18 +36,20 @@
           "versions": "0+", "about": "The partition data.", "fields": [
             { "name": "PartitionIndex", "type": "int32", "versions": "0+",
               "about": "The partition index." },
-            { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
-              "about": "The bumped epoch of the candidate sending the 
request."},
-            { "name": "CandidateId", "type": "int32", "versions": "0+", 
"entityType": "brokerId",
-              "about": "The replica id of the voter sending the request."},
-            { "name": "CandidateDirectoryId", "type": "uuid", "versions": 
"1+", "ignorable": true,
-              "about": "The directory id of the voter sending the request." },
+            { "name": "ReplicaEpoch", "type": "int32", "versions": "0+",
+              "about": "The epoch of the voter sending the request"},
+            { "name": "ReplicaId", "type": "int32", "versions": "0+", 
"entityType": "brokerId",
+              "about": "The replica id of the voter sending the request"},
+            { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "1+", 
"ignorable": true,
+              "about": "The directory id of the voter sending the request" },
             { "name": "VoterDirectoryId", "type": "uuid", "versions": "1+", 
"ignorable": true,
-              "about": "The ID of the voter sending the request."},
+              "about": "The directory id of the voter receiving the request"},
             { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
               "about": "The epoch of the last record written to the metadata 
log."},
             { "name": "LastOffset", "type": "int64", "versions": "0+",
-              "about": "The offset of the last record written to the metadata 
log."}
+              "about": "The log end offset of the metadata log of the voter 
sending the request."},
+            { "name": "PreVote", "type": "bool", "versions": "2+",
+              "about": "Whether the request is a PreVote request (not 
persisted) or not."}
           ]
         }
       ]
diff --git a/clients/src/main/resources/common/message/VoteResponse.json 
b/clients/src/main/resources/common/message/VoteResponse.json
index 56ef409284b..d8ffa4bb4f8 100644
--- a/clients/src/main/resources/common/message/VoteResponse.json
+++ b/clients/src/main/resources/common/message/VoteResponse.json
@@ -18,7 +18,8 @@
   "type": "response",
   "name": "VoteResponse",
   // Version 1 adds leader endpoint (KIP-853)
-  "validVersions": "0-1",
+  // Version 2 handles PreVote requests
+  "validVersions": "0-2",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c0dce6cbd1b..a838dafacd5 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -1182,7 +1182,7 @@ public class RequestResponseTest {
             case ALTER_CLIENT_QUOTAS: return createAlterClientQuotasResponse();
             case DESCRIBE_USER_SCRAM_CREDENTIALS: return 
createDescribeUserScramCredentialsResponse();
             case ALTER_USER_SCRAM_CREDENTIALS: return 
createAlterUserScramCredentialsResponse();
-            case VOTE: return createVoteResponse();
+            case VOTE: return createVoteResponse(version);
             case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse();
             case END_QUORUM_EPOCH: return createEndQuorumEpochResponse();
             case DESCRIBE_QUORUM: return createDescribeQuorumResponse();
@@ -1701,29 +1701,34 @@ public class RequestResponseTest {
     }
 
     private VoteRequest createVoteRequest(short version) {
+        VoteRequestData.PartitionData partitionData = new 
VoteRequestData.PartitionData()
+            .setPartitionIndex(0)
+            .setReplicaEpoch(1)
+            .setReplicaId(2)
+            .setLastOffset(3L)
+            .setLastOffsetEpoch(4);
+        if (version >= 2) {
+            partitionData.setPreVote(true);
+        }
         VoteRequestData data = new VoteRequestData()
                 .setClusterId("clusterId")
                 .setTopics(singletonList(new VoteRequestData.TopicData()
-                        .setPartitions(singletonList(new 
VoteRequestData.PartitionData()
-                                .setPartitionIndex(0)
-                                .setCandidateEpoch(1)
-                                .setCandidateId(2)
-                                .setLastOffset(3L)
-                                .setLastOffsetEpoch(4)))
+                        .setPartitions(singletonList(partitionData))
                         .setTopicName("topic1")));
         return new VoteRequest.Builder(data).build(version);
     }
 
-    private VoteResponse createVoteResponse() {
+    private VoteResponse createVoteResponse(short version) {
+        VoteResponseData.PartitionData partitionData = new 
VoteResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code())
+            .setLeaderEpoch(0)
+            .setPartitionIndex(1)
+            .setLeaderId(2)
+            .setVoteGranted(false);
         VoteResponseData data = new VoteResponseData()
                 .setErrorCode(Errors.NONE.code())
                 .setTopics(singletonList(new VoteResponseData.TopicData()
-                        .setPartitions(singletonList(new 
VoteResponseData.PartitionData()
-                                .setErrorCode(Errors.NONE.code())
-                                .setLeaderEpoch(0)
-                                .setPartitionIndex(1)
-                                .setLeaderId(2)
-                                .setVoteGranted(false)))));
+                        .setPartitions(singletonList(partitionData))));
         return new VoteResponse(data);
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java 
b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
index 175df7760de..902d334ad79 100644
--- a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java
@@ -276,15 +276,22 @@ public class CandidateState implements EpochState {
 
     @Override
     public boolean canGrantVote(
-        ReplicaKey candidateKey,
-        boolean isLogUpToDate
+        ReplicaKey replicaKey,
+        boolean isLogUpToDate,
+        boolean isPreVote
     ) {
-        // Still reject vote request even candidateId = localId, Although the 
candidate votes for
+        if (isPreVote && isLogUpToDate) {
+            return true;
+        }
+        // Reject standard vote requests even if replicaId = localId, although 
the replica votes for
         // itself, this vote is implicit and not "granted".
         log.debug(
-            "Rejecting vote request from candidate ({}) since we are already 
candidate in epoch {}",
-            candidateKey,
-            epoch
+            "Rejecting Vote request (preVote={}) from replica ({}) since we 
are in CandidateState in epoch {} " +
+                "and the replica's log is up-to-date={}",
+            isPreVote,
+            replicaKey,
+            epoch,
+            isLogUpToDate
         );
         return false;
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/EpochState.java 
b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
index 4f6baec79da..c47fc087f0d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
@@ -26,16 +26,17 @@ public interface EpochState extends Closeable {
     }
 
     /**
-     * Decide whether to grant a vote to a candidate.
+     * Decide whether to grant a vote to a replica.
      *
      * It is the responsibility of the caller to invoke
-     * {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} 
if vote is granted.
+     * {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} 
if a standard vote is granted.
      *
-     * @param candidateKey the id and directory of the candidate
-     * @param isLogUpToDate whether the candidate’s log is at least as 
up-to-date as receiver’s log
+     * @param replicaKey the id and directory of the replica requesting the 
vote
+     * @param isLogUpToDate whether the replica's log is at least as 
up-to-date as receiver’s log
+     * @param isPreVote whether the vote request is a PreVote (non-binding) or 
standard vote
      * @return true if it can grant the vote, false otherwise
      */
-    boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate);
+    boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, boolean 
isPreVote);
 
     /**
      * Get the current election state, which is guaranteed to be immutable.
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java 
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index 67748f54adc..e3db67893bf 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -37,6 +37,10 @@ public class FollowerState implements EpochState {
     private final Set<Integer> voters;
     // Used for tracking the expiration of both the Fetch and FetchSnapshot 
requests
     private final Timer fetchTimer;
+    /* Used to track if the replica has fetched successfully from the leader 
at least once since the transition to
+     * follower in this epoch. If the replica has not yet fetched 
successfully, it may be able to grant PreVotes.
+     */
+    private boolean hasFetchedFromLeader;
     private Optional<LogOffsetMetadata> highWatermark;
     /* Used to track the currently fetching snapshot. When fetching snapshot 
regular
      * Fetch request are paused
@@ -66,6 +70,7 @@ public class FollowerState implements EpochState {
         this.updateVoterPeriodTimer = time.timer(updateVoterPeriodMs());
         this.highWatermark = highWatermark;
         this.log = logContext.logger(FollowerState.class);
+        this.hasFetchedFromLeader = false;
     }
 
     @Override
@@ -118,9 +123,10 @@ public class FollowerState implements EpochState {
         return fetchTimer.isExpired();
     }
 
-    public void resetFetchTimeout(long currentTimeMs) {
+    public void resetFetchTimeoutForSuccessfulFetch(long currentTimeMs) {
         fetchTimer.update(currentTimeMs);
         fetchTimer.reset(fetchTimeoutMs);
+        hasFetchedFromLeader = true;
     }
 
     public void overrideFetchTimeout(long currentTimeMs, long timeoutMs) {
@@ -202,12 +208,19 @@ public class FollowerState implements EpochState {
     }
 
     @Override
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, 
boolean isPreVote) {
+        if (isPreVote && !hasFetchedFromLeader && isLogUpToDate) {
+            return true;
+        }
         log.debug(
-            "Rejecting vote request from candidate ({}) since we already have 
a leader {} in epoch {}",
-            candidateKey,
+            "Rejecting Vote request (preVote={}) from replica ({}) since we 
are in FollowerState with leader {} in " +
+                "epoch {}, hasFetchedFromLeader={}, replica's log is 
up-to-date={}",
+            isPreVote,
+            replicaKey,
             leaderId,
-            epoch
+            epoch,
+            hasFetchedFromLeader,
+            isLogUpToDate
         );
         return false;
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 5f650796f5d..41da22b4b36 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -779,12 +779,32 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         VoteRequestData.PartitionData partitionRequest =
             request.topics().get(0).partitions().get(0);
 
-        int candidateId = partitionRequest.candidateId();
-        int candidateEpoch = partitionRequest.candidateEpoch();
+        int replicaId = partitionRequest.replicaId();
+        int replicaEpoch = partitionRequest.replicaEpoch();
+        boolean preVote = partitionRequest.preVote();
 
         int lastEpoch = partitionRequest.lastOffsetEpoch();
         long lastEpochEndOffset = partitionRequest.lastOffset();
-        if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= 
candidateEpoch) {
+        /* Validate the replica epoch and the log's last epoch.
+         *
+         * For a standard vote, the candidate replica increases the epoch 
before sending a vote request.
+         * So we expect the replicaEpoch to be strictly greater than the log's 
last epoch. This is always true because
+         * the candidate has never seen a leader at replicaEpoch.
+         *
+         * For a PreVote, the prospective replica doesn't increase the epoch 
so it is possible for there to be a leader
+         * and a record in the log at the prospective replica's replicaEpoch.
+         */
+        boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch : 
lastEpoch >= replicaEpoch;
+        if (isIllegalEpoch) {
+            logger.info(
+                "Received a vote request from replica {} with illegal epoch 
{}, last epoch {}, preVote={}",
+                replicaId,
+                replicaEpoch,
+                lastEpoch,
+                preVote
+            );
+        }
+        if (lastEpochEndOffset < 0 || lastEpoch < 0 || isIllegalEpoch) {
             return buildVoteResponse(
                 requestMetadata.listenerName(),
                 requestMetadata.apiVersion(),
@@ -793,7 +813,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             );
         }
 
-        Optional<Errors> errorOpt = validateVoterOnlyRequest(candidateId, 
candidateEpoch);
+        Optional<Errors> errorOpt = validateVoterOnlyRequest(replicaId, 
replicaEpoch);
         if (errorOpt.isPresent()) {
             return buildVoteResponse(
                 requestMetadata.listenerName(),
@@ -803,15 +823,15 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             );
         }
 
-        if (candidateEpoch > quorum.epoch()) {
-            transitionToUnattached(candidateEpoch);
+        if (replicaEpoch > quorum.epoch()) {
+            transitionToUnattached(replicaEpoch);
         }
 
         // Check that the request was intended for this replica
         Optional<ReplicaKey> voterKey = RaftUtil.voteRequestVoterKey(request, 
partitionRequest);
         if (!isValidVoterKey(voterKey)) {
             logger.info(
-                "Candidate sent a voter key ({}) in the VOTE request that 
doesn't match the " +
+                "A replica sent a voter key ({}) in the VOTE request that 
doesn't match the " +
                 "local key ({}, {}); rejecting the vote",
                 voterKey,
                 nodeId,
@@ -827,20 +847,26 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
 
         OffsetAndEpoch lastEpochEndOffsetAndEpoch = new 
OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
-        ReplicaKey candidateKey = ReplicaKey.of(
-            candidateId,
-            partitionRequest.candidateDirectoryId()
+        ReplicaKey replicaKey = ReplicaKey.of(
+            replicaId,
+            partitionRequest.replicaDirectoryId()
         );
         boolean voteGranted = quorum.canGrantVote(
-            candidateKey,
-            lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0
+            replicaKey,
+            lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0,
+            preVote
         );
 
-        if (voteGranted && quorum.isUnattachedNotVoted()) {
-            transitionToUnattachedVoted(candidateKey, candidateEpoch);
+        if (!preVote && voteGranted && quorum.isUnattachedNotVoted()) {
+            transitionToUnattachedVoted(replicaKey, replicaEpoch);
         }
 
-        logger.info("Vote request {} with epoch {} is {}", request, 
candidateEpoch, voteGranted ? "granted" : "rejected");
+        logger.info(
+            "Vote request {} with epoch {} is {}",
+            request,
+            replicaEpoch,
+            voteGranted ? "granted" : "rejected"
+        );
         return buildVoteResponse(
             requestMetadata.listenerName(),
             requestMetadata.apiVersion(),
@@ -1684,7 +1710,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 updateFollowerHighWatermark(state, highWatermark);
             }
 
-            state.resetFetchTimeout(currentTimeMs);
+            state.resetFetchTimeoutForSuccessfulFetch(currentTimeMs);
             return true;
         } else {
             return handleUnexpectedError(error, responseMetadata);
@@ -1988,7 +2014,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 partitionSnapshot.snapshotId()
             );
             state.setFetchingSnapshot(Optional.empty());
-            state.resetFetchTimeout(currentTimeMs);
+            state.resetFetchTimeoutForSuccessfulFetch(currentTimeMs);
             return true;
         }
 
@@ -2066,7 +2092,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             }
         }
 
-        state.resetFetchTimeout(currentTimeMs);
+        state.resetFetchTimeoutForSuccessfulFetch(currentTimeMs);
         return true;
     }
 
@@ -2733,7 +2759,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             quorum.localReplicaKeyOrThrow(),
             remoteVoter,
             endOffset.epoch(),
-            endOffset.offset()
+            endOffset.offset(),
+            false
         );
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 0024c849b21..eb5f8cb3d87 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -831,10 +831,11 @@ public class LeaderState<T> implements EpochState {
     }
 
     @Override
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, 
boolean isPreVote) {
         log.debug(
-            "Rejecting vote request from candidate ({}) since we are already 
leader in epoch {}",
-            candidateKey,
+            "Rejecting Vote request (preVote={}) from replica ({}) since we 
are already leader in epoch {}",
+            isPreVote,
+            replicaKey,
             epoch
         );
         return false;
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index de82977583a..4e1ba679e35 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -44,14 +44,10 @@ import java.util.Random;
  *    Follower: After discovering a leader with an equal or larger epoch
  *
  * Unattached transitions to:
- *    Unattached: After learning of a new election with a higher epoch or 
after voting
+ *    Unattached: After learning of a new election with a higher epoch or 
after giving a binding vote
  *    Candidate: After expiration of the election timeout
  *    Follower: After discovering a leader with an equal or larger epoch
  *
- * Voted transitions to:
- *    Unattached: After learning of a new election with a higher epoch
- *    Candidate: After expiration of the election timeout
- *
  * Candidate transitions to:
  *    Unattached: After learning of a new election with a higher epoch
  *    Candidate: After expiration of the election timeout
@@ -641,8 +637,8 @@ public class QuorumState {
         return electionTimeoutMs + random.nextInt(electionTimeoutMs);
     }
 
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
-        return state.canGrantVote(candidateKey, isLogUpToDate);
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, 
boolean isPreVote) {
+        return state.canGrantVote(replicaKey, isLogUpToDate, isPreVote);
     }
 
     public FollowerState followerStateOrThrow() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index 018bec0d632..12c48955b39 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -144,11 +144,12 @@ public class RaftUtil {
     public static VoteRequestData singletonVoteRequest(
         TopicPartition topicPartition,
         String clusterId,
-        int candidateEpoch,
-        ReplicaKey candidateKey,
+        int replicaEpoch,
+        ReplicaKey replicaKey,
         ReplicaKey voterKey,
         int lastEpoch,
-        long lastEpochEndOffset
+        long lastEpochEndOffset,
+        boolean preVote
     ) {
         return new VoteRequestData()
             .setClusterId(clusterId)
@@ -161,10 +162,10 @@ public class RaftUtil {
                             Collections.singletonList(
                                 new VoteRequestData.PartitionData()
                                     
.setPartitionIndex(topicPartition.partition())
-                                    .setCandidateEpoch(candidateEpoch)
-                                    .setCandidateId(candidateKey.id())
-                                    .setCandidateDirectoryId(
-                                        candidateKey
+                                    .setReplicaEpoch(replicaEpoch)
+                                    .setReplicaId(replicaKey.id())
+                                    .setReplicaDirectoryId(
+                                        replicaKey
                                             .directoryId()
                                             .orElse(ReplicaKey.NO_DIRECTORY_ID)
                                     )
@@ -175,6 +176,7 @@ public class RaftUtil {
                                     )
                                     .setLastOffsetEpoch(lastEpoch)
                                     .setLastOffset(lastEpochEndOffset)
+                                    .setPreVote(preVote)
                             )
                         )
                 )
@@ -192,17 +194,18 @@ public class RaftUtil {
         boolean voteGranted,
         Endpoints endpoints
     ) {
+        VoteResponseData.PartitionData partitionData = new 
VoteResponseData.PartitionData()
+            .setErrorCode(partitionLevelError.code())
+            .setLeaderId(leaderId)
+            .setLeaderEpoch(leaderEpoch)
+            .setVoteGranted(voteGranted);
+
         VoteResponseData response = new VoteResponseData()
             .setErrorCode(topLevelError.code())
             .setTopics(Collections.singletonList(
                 new VoteResponseData.TopicData()
                     .setTopicName(topicPartition.topic())
-                    .setPartitions(Collections.singletonList(
-                        new VoteResponseData.PartitionData()
-                            .setErrorCode(partitionLevelError.code())
-                            .setLeaderId(leaderId)
-                            .setLeaderEpoch(leaderEpoch)
-                            .setVoteGranted(voteGranted)))));
+                    .setPartitions(Collections.singletonList(partitionData))));
 
         if (apiVersion >= 1) {
             Optional<InetSocketAddress> address = 
endpoints.address(listenerName);
diff --git a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java 
b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
index eaee0496b82..d79ee44c846 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java
@@ -140,11 +140,17 @@ public class ResignedState implements EpochState {
     }
 
     @Override
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, 
boolean isPreVote) {
+        if (isPreVote && isLogUpToDate) {
+            return true;
+        }
         log.debug(
-            "Rejecting vote request from candidate ({}) since we have resigned 
as candidate/leader in epoch {}",
-            candidateKey,
-            epoch
+            "Rejecting Vote request (preVote={}) from replica ({}) since we 
are in ResignedState in epoch {} " +
+                "and the replica's log is up-to-date={}",
+            isPreVote,
+            replicaKey,
+            epoch,
+            isLogUpToDate
         );
 
         return false;
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java 
b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index f41c6b8d563..94a596d4115 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -119,16 +119,18 @@ public class UnattachedState implements EpochState {
     }
 
     @Override
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
-        if (votedKey.isPresent()) {
+    public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate, 
boolean isPreVote) {
+        if (isPreVote) {
+            return canGrantPreVote(replicaKey, isLogUpToDate);
+        } else if (votedKey.isPresent()) {
             ReplicaKey votedReplicaKey = votedKey.get();
-            if (votedReplicaKey.id() == candidateKey.id()) {
-                return votedReplicaKey.directoryId().isEmpty() || 
votedReplicaKey.directoryId().equals(candidateKey.directoryId());
+            if (votedReplicaKey.id() == replicaKey.id()) {
+                return votedReplicaKey.directoryId().isEmpty() || 
votedReplicaKey.directoryId().equals(replicaKey.directoryId());
             }
             log.debug(
-                "Rejecting vote request from candidate ({}), already have 
voted for another " +
+                "Rejecting Vote request (preVote=false) from candidate ({}), 
already have voted for another " +
                     "candidate ({}) in epoch {}",
-                candidateKey,
+                replicaKey,
                 votedKey,
                 epoch
             );
@@ -136,16 +138,27 @@ public class UnattachedState implements EpochState {
         } else if (leaderId.isPresent()) {
             // If the leader id is known it should behave similar to the 
follower state
             log.debug(
-                "Rejecting vote request from candidate ({}) since we already 
have a leader {} in epoch {}",
-                candidateKey,
+                "Rejecting Vote request (preVote=false) from candidate ({}) 
since we already have a leader {} in epoch {}",
+                replicaKey,
                 leaderId,
                 epoch
             );
             return false;
         } else if (!isLogUpToDate) {
             log.debug(
-                "Rejecting vote request from candidate ({}) since candidate 
epoch/offset is not up to date with us",
-                candidateKey
+                "Rejecting Vote request (preVote=false) from candidate ({}) 
since candidate epoch/offset is not up to date with us",
+                replicaKey
+            );
+        }
+
+        return isLogUpToDate;
+    }
+
+    private boolean canGrantPreVote(ReplicaKey replicaKey, boolean 
isLogUpToDate) {
+        if (!isLogUpToDate) {
+            log.debug(
+                "Rejecting Vote request (preVote=true) from replica ({}) since 
replica's log is not up to date with us",
+                replicaKey
             );
         }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
index 4f764d43a21..e494a306cad 100644
--- a/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java
@@ -219,10 +219,15 @@ public class CandidateStateTest {
             voterSetWithLocal(Stream.of(node1, node2, node3), withDirectoryId)
         );
 
-        assertFalse(state.canGrantVote(node0, isLogUpToDate));
-        assertFalse(state.canGrantVote(node1, isLogUpToDate));
-        assertFalse(state.canGrantVote(node2, isLogUpToDate));
-        assertFalse(state.canGrantVote(node3, isLogUpToDate));
+        assertEquals(isLogUpToDate, state.canGrantVote(node0, isLogUpToDate, 
true));
+        assertEquals(isLogUpToDate, state.canGrantVote(node1, isLogUpToDate, 
true));
+        assertEquals(isLogUpToDate, state.canGrantVote(node2, isLogUpToDate, 
true));
+        assertEquals(isLogUpToDate, state.canGrantVote(node3, isLogUpToDate, 
true));
+
+        assertFalse(state.canGrantVote(node0, isLogUpToDate, false));
+        assertFalse(state.canGrantVote(node1, isLogUpToDate, false));
+        assertFalse(state.canGrantVote(node2, isLogUpToDate, false));
+        assertFalse(state.canGrantVote(node3, isLogUpToDate, false));
     }
 
     @ParameterizedTest
diff --git a/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
index 8c9c874e526..4c93cad065a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FollowerStateTest.java
@@ -92,6 +92,19 @@ public class FollowerStateTest {
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testPreVoteIfHasNotFetchedFromLeaderYet(boolean isLogUpToDate) 
{
+        FollowerState state = newFollowerState(
+            Set.of(1, 2, 3),
+            Optional.empty()
+        );
+
+        assertEquals(isLogUpToDate, state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
+        assertEquals(isLogUpToDate, state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
+        assertEquals(isLogUpToDate, state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testGrantVote(boolean isLogUpToDate) {
@@ -99,10 +112,15 @@ public class FollowerStateTest {
             Set.of(1, 2, 3),
             Optional.empty()
         );
+        state.resetFetchTimeoutForSuccessfulFetch(time.milliseconds());
+
+        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
+        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
+        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
 
-        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
-        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
-        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
+        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
     }
 
     @Test
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
new file mode 100644
index 00000000000..a27853ce64a
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientPreVoteTest.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.MemoryRecords;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.OptionalInt;
+import java.util.Set;
+
+import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
+import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaRaftClientPreVoteTest {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerWithElectedLeader(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        ReplicaKey observer = replicaKey(localId + 2, true);
+        int electedLeaderId = localId + 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id(), 
electedLeaderId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, electedLeaderId)
+            .withKip853Rpc(true)
+            .build();
+
+        if (hasFetchedFromLeader) {
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(epoch, electedLeaderId, 
MemoryRecords.EMPTY, 0L, Errors.NONE)
+            );
+        }
+
+        // follower should reject pre-vote requests with the same epoch if it 
has successfully fetched from the leader
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+
+        boolean voteGranted = !hasFetchedFromLeader;
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(electedLeaderId), voteGranted);
+        context.assertElectedLeader(epoch, electedLeaderId);
+
+        // same with observers
+        context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 
1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(electedLeaderId), voteGranted);
+        context.assertElectedLeader(epoch, electedLeaderId);
+
+        // follower will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, 
epoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.of(-1), true);
+        assertEquals(context.currentEpoch(), epoch + 1);
+        assertTrue(context.client.quorum().isUnattachedNotVoted());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsCandidate() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        ReplicaKey observer = replicaKey(localId + 2, true);
+        int leaderEpoch = 2;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(leaderEpoch, ReplicaKey.of(localId, 
ReplicaKey.NO_DIRECTORY_ID))
+            .withKip853Rpc(true)
+            .build();
+        assertTrue(context.client.quorum().isCandidate());
+
+        // candidate should grant pre-vote requests with the same epoch if log 
is up-to-date
+        context.deliverRequest(context.preVoteRequest(leaderEpoch, 
otherNodeKey, leaderEpoch, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(leaderEpoch, localId);
+        assertTrue(context.client.quorum().isCandidate());
+
+        // if an observer sends a pre-vote request for the same epoch, it 
should also be granted
+        context.deliverRequest(context.preVoteRequest(leaderEpoch, observer, 
leaderEpoch, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, 
OptionalInt.empty(), true);
+        context.assertVotedCandidate(leaderEpoch, localId);
+        assertTrue(context.client.quorum().isCandidate());
+
+        // candidate will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(leaderEpoch + 1, 
otherNodeKey, leaderEpoch + 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch + 1, 
OptionalInt.of(-1), true);
+        assertTrue(context.client.quorum().isUnattached());
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsUnattachedObserver() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        ReplicaKey observer = replicaKey(localId + 3, true);
+        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if same replica sends another pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if different replica sends a pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if an observer sends a pre-vote request for the same epoch, it 
should also be granted
+        context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsUnattachedVoted() throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        ReplicaKey observer = replicaKey(localId + 3, true);
+        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withVotedCandidate(epoch, replica2)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattachedAndVoted());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if same replica sends another pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if different replica sends a pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+
+        // if an observer sends a pre-vote request for the same epoch, it 
should also be granted
+        context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), true);
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsUnattachedWithLeader() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        ReplicaKey leader = replicaKey(localId + 3, true);
+        ReplicaKey observer = replicaKey(localId + 4, true);
+        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, leader.id())
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattachedNotVoted());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leader.id()), true);
+
+        // if same replica sends another pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leader.id()), true);
+
+        // if different replica sends a pre-vote request for the same epoch, 
it should be granted
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leader.id()), true);
+
+        // if an observer sends a pre-vote request for the same epoch, it 
should also be granted
+        context.deliverRequest(context.preVoteRequest(epoch, observer, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leader.id()), true);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testHandlePreVoteRequestAsFollowerObserver(boolean 
hasFetchedFromLeader) throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        int leaderId = localId + 1;
+        ReplicaKey leader = replicaKey(leaderId, true);
+        ReplicaKey follower = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(leader.id(), follower.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, leader.id())
+            .withKip853Rpc(true)
+            .build();
+        context.assertElectedLeader(epoch, leader.id());
+
+        if (hasFetchedFromLeader) {
+            context.pollUntilRequest();
+            RaftRequest.Outbound fetchRequest = 
context.assertSentFetchRequest();
+            context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+            context.deliverResponse(
+                fetchRequest.correlationId(),
+                fetchRequest.destination(),
+                context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 
0L, Errors.NONE)
+            );
+        }
+
+        context.deliverRequest(context.preVoteRequest(epoch, follower, epoch, 
1));
+        context.pollUntilResponse();
+
+        boolean voteGranted = !hasFetchedFromLeader;
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(leaderId), voteGranted);
+        assertTrue(context.client.quorum().isFollower());
+    }
+
+    @Test
+    public void testHandleInvalidPreVoteRequestWithOlderEpoch() throws 
Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(true)
+            .build();
+
+        context.deliverRequest(context.preVoteRequest(epoch - 1, otherNodeKey, 
epoch - 1, 1));
+        context.pollUntilResponse();
+
+        context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch, 
OptionalInt.empty(), false);
+        context.assertUnknownLeader(epoch);
+    }
+
+    @Test
+    public void testLeaderRejectPreVoteRequestOnSameEpoch() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(2)
+            .withKip853Rpc(true)
+            .build();
+
+        context.becomeLeader();
+        int leaderEpoch = context.currentEpoch();
+
+        context.deliverRequest(context.preVoteRequest(leaderEpoch, 
otherNodeKey, leaderEpoch, 1));
+
+        context.client.poll();
+
+        context.assertSentVoteResponse(Errors.NONE, leaderEpoch, 
OptionalInt.of(localId), false);
+        context.assertElectedLeader(leaderEpoch, localId);
+    }
+
+    @Test
+    public void testPreVoteRequestClusterIdValidation() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withKip853Rpc(true)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // valid cluster id is accepted
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(localId), false);
+
+        // null cluster id is accepted
+        context.deliverRequest(context.voteRequest(null, epoch, otherNodeKey, 
epoch, 0, true));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(localId), false);
+
+        // empty cluster id is rejected
+        context.deliverRequest(context.voteRequest("", epoch, otherNodeKey, 
epoch, 0, true));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
+
+        // invalid cluster id is rejected
+        context.deliverRequest(context.voteRequest("invalid-uuid", epoch, 
otherNodeKey, epoch, 0, true));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
+    }
+
+    @Test
+    public void testInvalidVoterReplicaPreVoteRequest() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withKip853Rpc(true)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // invalid voter id is rejected
+        context.deliverRequest(
+            context.voteRequest(
+                context.clusterId.toString(),
+                epoch,
+                otherNodeKey,
+                ReplicaKey.of(10, Uuid.randomUuid()),
+                epoch,
+                100,
+                true
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.INVALID_VOTER_KEY, epoch, 
OptionalInt.of(localId), false);
+
+        // invalid voter directory id is rejected
+        context.deliverRequest(
+            context.voteRequest(
+                context.clusterId.toString(),
+                epoch,
+                otherNodeKey,
+                ReplicaKey.of(0, Uuid.randomUuid()),
+                epoch,
+                100,
+                true
+            )
+        );
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.INVALID_VOTER_KEY, epoch, 
OptionalInt.of(localId), false);
+    }
+
+    @Test
+    public void testLeaderAcceptPreVoteFromObserver() throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        Set<Integer> voters = Set.of(localId, otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(4)
+            .withKip853Rpc(true)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        ReplicaKey observerKey = replicaKey(localId + 2, true);
+        context.deliverRequest(context.preVoteRequest(epoch - 1, observerKey, 
0, 0));
+        context.client.poll();
+        context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch, 
OptionalInt.of(localId), false);
+
+        context.deliverRequest(context.preVoteRequest(epoch, observerKey, 0, 
0));
+        context.client.poll();
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(localId), false);
+    }
+
+    @Test
+    public void testHandlePreVoteRequestAsResigned() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withKip853Rpc(true)
+            .build();
+        context.becomeLeader();
+        context.client.quorum().transitionToResigned(Collections.emptyList());
+        assertTrue(context.client.quorum().isResigned());
+
+        // resigned should grant pre-vote requests with the same epoch if log 
is up-to-date
+        int epoch = context.currentEpoch();
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch, 1));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(localId), true);
+
+        // resigned will transition to unattached if pre-vote request has a 
higher epoch
+        context.deliverRequest(context.preVoteRequest(epoch + 1, otherNodeKey, 
epoch + 1, 1));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.of(-1), true);
+        assertTrue(context.client.quorum().isUnattached());
+    }
+
+    @Test
+    public void testInvalidVoteRequest() throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
+        int epoch = 5;
+        Set<Integer> voters = Set.of(localId, otherNodeKey.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, otherNodeKey.id())
+            .withKip853Rpc(true)
+            .build();
+        assertEquals(epoch, context.currentEpoch());
+        context.assertElectedLeader(epoch, otherNodeKey.id());
+
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 0, 
-5L));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(
+            Errors.INVALID_REQUEST,
+            epoch,
+            OptionalInt.of(otherNodeKey.id()),
+            false
+        );
+        assertEquals(epoch, context.currentEpoch());
+        context.assertElectedLeader(epoch, otherNodeKey.id());
+
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, -1, 
0L));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(
+            Errors.INVALID_REQUEST,
+            epoch,
+            OptionalInt.of(otherNodeKey.id()),
+            false
+        );
+        assertEquals(epoch, context.currentEpoch());
+        context.assertElectedLeader(epoch, otherNodeKey.id());
+
+        context.deliverRequest(context.preVoteRequest(epoch, otherNodeKey, 
epoch + 1, 0L));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(
+            Errors.INVALID_REQUEST,
+            epoch,
+            OptionalInt.of(otherNodeKey.id()),
+            false
+        );
+        assertEquals(epoch, context.currentEpoch());
+        context.assertElectedLeader(epoch, otherNodeKey.id());
+    }
+
+    @Test
+    public void testFollowerGrantsPreVoteIfHasNotFetchedYet() throws Exception 
{
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, replica1.id())
+            .withKip853Rpc(true)
+            .build();
+        assertTrue(context.client.quorum().isFollower());
+
+        // We will grant PreVotes before fetching successfully from the 
leader, it will NOT contain the leaderId
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isFollower());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(replica1.id()), true);
+
+        // After fetching successfully from the leader once, we will no longer 
grant PreVotes
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest.correlationId(),
+            fetchRequest.destination(),
+            context.fetchResponse(epoch, replica1.id(), MemoryRecords.EMPTY, 
0L, Errors.NONE)
+        );
+        assertTrue(context.client.quorum().isFollower());
+
+        context.deliverRequest(context.preVoteRequest(epoch, replica2, epoch, 
1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isFollower());
+    }
+
+    @Test
+    public void testRejectPreVoteIfRemoteLogIsNotUpToDate() throws Exception {
+        int localId = randomReplicaId();
+        int epoch = 2;
+        ReplicaKey replica1 = replicaKey(localId + 1, true);
+        ReplicaKey replica2 = replicaKey(localId + 2, true);
+        Set<Integer> voters = Set.of(localId, replica1.id(), replica2.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(true)
+            .appendToLog(epoch, Arrays.asList("a", "b", "c"))
+            .build();
+        assertTrue(context.client.quorum().isUnattached());
+        assertEquals(3, context.log.endOffset().offset());
+
+        // older epoch
+        context.deliverRequest(context.preVoteRequest(epoch - 1, replica1, 
epoch - 1, 0));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch, 
OptionalInt.empty(), false);
+
+        // older offset
+        context.deliverRequest(context.preVoteRequest(epoch, replica1, epoch - 
1, context.log.endOffset().offset() - 1));
+        context.pollUntilResponse();
+
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.empty(), false);
+    }
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 90057be0bf2..ab6b698cc38 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -2246,17 +2246,17 @@ public class KafkaRaftClientTest {
         context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(localId), false);
 
         // null cluster id is accepted
-        context.deliverRequest(context.voteRequest(null, epoch, otherNodeKey, 
0, 0));
+        context.deliverRequest(context.voteRequest(null, epoch, otherNodeKey, 
0, 0, false));
         context.pollUntilResponse();
         context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(localId), false);
 
         // empty cluster id is rejected
-        context.deliverRequest(context.voteRequest("", epoch, otherNodeKey, 0, 
0));
+        context.deliverRequest(context.voteRequest("", epoch, otherNodeKey, 0, 
0, false));
         context.pollUntilResponse();
         context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
 
         // invalid cluster id is rejected
-        context.deliverRequest(context.voteRequest("invalid-uuid", epoch, 
otherNodeKey, 0, 0));
+        context.deliverRequest(context.voteRequest("invalid-uuid", epoch, 
otherNodeKey, 0, 0, false));
         context.pollUntilResponse();
         context.assertSentVoteResponse(Errors.INCONSISTENT_CLUSTER_ID);
     }
@@ -2282,7 +2282,8 @@ public class KafkaRaftClientTest {
                 otherNodeKey,
                 ReplicaKey.of(10, Uuid.randomUuid()),
                 epoch,
-                100
+                100,
+                false
             )
         );
         context.pollUntilResponse();
@@ -2296,7 +2297,8 @@ public class KafkaRaftClientTest {
                 otherNodeKey,
                 ReplicaKey.of(0, Uuid.randomUuid()),
                 epoch,
-                100
+                100,
+                false
             )
         );
         context.pollUntilResponse();
@@ -4492,7 +4494,7 @@ public class KafkaRaftClientTest {
         return ReplicaKey.of(id, directoryId);
     }
 
-    private static int randomReplicaId() {
+    static int randomReplicaId() {
         return ThreadLocalRandom.current().nextInt(1025);
     }
 }
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 696a926f50a..ce3cef11ef8 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -575,15 +575,13 @@ public class LeaderStateTest {
             1
         );
 
-        assertFalse(
-            state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
-        );
-        assertFalse(
-            state.canGrantVote(ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
-        );
-        assertFalse(
-            state.canGrantVote(ReplicaKey.of(3, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
-        );
+        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
+        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
+        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true));
+
+        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
     }
 
     @ParameterizedTest
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index f338b1df081..1343a57da1c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -768,11 +768,11 @@ public final class RaftClientTestContext {
 
         VoteResponseData.PartitionData partitionResponse = 
response.topics().get(0).partitions().get(0);
 
-        String voterIdDebugLog = "Leader Id: " + leaderId +
+        String leaderIdDebugLog = "Leader Id: " + leaderId +
             " Partition response leader Id: " + partitionResponse.leaderId();
-        assertEquals(voteGranted, partitionResponse.voteGranted(), 
voterIdDebugLog);
-        assertEquals(error, Errors.forCode(partitionResponse.errorCode()), 
voterIdDebugLog);
-        assertEquals(leaderId.orElse(-1), partitionResponse.leaderId());
+        assertEquals(voteGranted, partitionResponse.voteGranted());
+        assertEquals(error, Errors.forCode(partitionResponse.errorCode()));
+        assertEquals(leaderId.orElse(-1), partitionResponse.leaderId(), 
leaderIdDebugLog);
         assertEquals(epoch, partitionResponse.leaderEpoch());
 
         if (kip853Rpc && leaderId.isPresent()) {
@@ -797,8 +797,8 @@ public final class RaftClientTestContext {
                 VoteRequestData request = (VoteRequestData) raftMessage.data();
                 VoteRequestData.PartitionData partitionRequest = 
unwrap(request);
 
-                assertEquals(epoch, partitionRequest.candidateEpoch());
-                assertEquals(localIdOrThrow(), partitionRequest.candidateId());
+                assertEquals(epoch, partitionRequest.replicaEpoch());
+                assertEquals(localIdOrThrow(), partitionRequest.replicaId());
                 assertEquals(lastEpoch, partitionRequest.lastOffsetEpoch());
                 assertEquals(lastEpochOffset, partitionRequest.lastOffset());
                 voteRequests.add(raftMessage);
@@ -1440,7 +1440,24 @@ public final class RaftClientTestContext {
             epoch,
             candidateKey,
             lastEpoch,
-            lastEpochOffset
+            lastEpochOffset,
+            false
+        );
+    }
+
+    VoteRequestData preVoteRequest(
+        int epoch,
+        ReplicaKey candidateKey,
+        int lastEpoch,
+        long lastEpochOffset
+    ) {
+        return voteRequest(
+            clusterId,
+            epoch,
+            candidateKey,
+            lastEpoch,
+            lastEpochOffset,
+            true
         );
     }
 
@@ -1449,7 +1466,8 @@ public final class RaftClientTestContext {
         int epoch,
         ReplicaKey candidateKey,
         int lastEpoch,
-        long lastEpochOffset
+        long lastEpochOffset,
+        boolean preVote
     ) {
         ReplicaKey localReplicaKey = kip853Rpc ?
             ReplicaKey.of(localIdOrThrow(), localDirectoryId) :
@@ -1461,7 +1479,8 @@ public final class RaftClientTestContext {
             candidateKey,
             localReplicaKey,
             lastEpoch,
-            lastEpochOffset
+            lastEpochOffset,
+            preVote
         );
     }
 
@@ -1471,7 +1490,8 @@ public final class RaftClientTestContext {
         ReplicaKey candidateKey,
         ReplicaKey voterKey,
         int lastEpoch,
-        long lastEpochOffset
+        long lastEpochOffset,
+        boolean preVote
     ) {
         return RaftUtil.singletonVoteRequest(
                 metadataPartition,
@@ -1480,7 +1500,8 @@ public final class RaftClientTestContext {
                 candidateKey,
                 voterKey,
                 lastEpoch,
-                lastEpochOffset
+                lastEpochOffset,
+                preVote
         );
     }
 
@@ -1818,9 +1839,9 @@ public final class RaftClientTestContext {
         }
     }
 
-    private short voteRpcVersion() {
+    short voteRpcVersion() {
         if (kip853Rpc) {
-            return 1;
+            return ApiKeys.VOTE.latestVersion();
         } else {
             return 0;
         }
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
index 5e1d234c279..ca100fa5239 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java
@@ -75,6 +75,8 @@ public class RaftUtilTest {
     private final ListenerName listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
     private final InetSocketAddress address = 
InetSocketAddress.createUnresolved("localhost", 9990);
     private final String clusterId = "I4ZmrWqfT2e-upky_4fdPA";
+    private static final Uuid TEST_DIRECTORY_ID1 = Uuid.randomUuid();
+    private static final Uuid TEST_DIRECTORY_ID2 = Uuid.randomUuid();
 
     @Test
     public void testErrorResponse() {
@@ -187,13 +189,26 @@ public class RaftUtilTest {
         return Stream.of(
                 Arguments.of((short) 0,
                         
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"topics\":[{\"topicName\":\"topic\","
 +
-                            
"\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,\"candidateId\":1," 
+
+                            
"\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,\"replicaId\":1," +
                             
"\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
                 Arguments.of((short) 1,
                         
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
-                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,"
 +
-                            
"\"candidateId\":1,\"candidateDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
-                            
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}")
+                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
 +
+                            "\"replicaId\":1,\"replicaDirectoryId\":\"" + 
TEST_DIRECTORY_ID1 + "\"," +
+                            "\"voterDirectoryId\":\"" + TEST_DIRECTORY_ID2 + 
"\",\"lastOffsetEpoch\":1000," +
+                            "\"lastOffset\":1000}]}]}"),
+                Arguments.of((short) 2,
+                        
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
+                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
 +
+                            "\"replicaId\":1,\"replicaDirectoryId\":\"" + 
TEST_DIRECTORY_ID1 + "\"," +
+                            "\"voterDirectoryId\":\"" + TEST_DIRECTORY_ID2 + 
"\",\"lastOffsetEpoch\":1000," +
+                            "\"lastOffset\":1000,\"preVote\":true}]}]}"),
+                Arguments.of((short) 2,
+                        
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
+                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
 +
+                            "\"replicaId\":1,\"replicaDirectoryId\":\"" + 
TEST_DIRECTORY_ID1 + "\"," +
+                            "\"voterDirectoryId\":\"" + TEST_DIRECTORY_ID2 + 
"\",\"lastOffsetEpoch\":1000," +
+                            "\"lastOffset\":1000,\"preVote\":true}]}]}")
         );
     }
 
@@ -203,6 +218,10 @@ public class RaftUtilTest {
                         
"{\"errorCode\":0,\"topics\":[{\"topicName\":\"topic\",\"partitions\":[{" +
                             
"\"partitionIndex\":0,\"errorCode\":0,\"leaderId\":1,\"leaderEpoch\":1,\"voteGranted\":true}]}]}"),
                 Arguments.of((short) 1,
+                        
"{\"errorCode\":0,\"topics\":[{\"topicName\":\"topic\",\"partitions\":[{" +
+                            
"\"partitionIndex\":0,\"errorCode\":0,\"leaderId\":1,\"leaderEpoch\":1,\"voteGranted\":true}]}],"
 +
+                            
"\"nodeEndpoints\":[{\"nodeId\":1,\"host\":\"localhost\",\"port\":9990}]}"),
+                Arguments.of((short) 2,
                         
"{\"errorCode\":0,\"topics\":[{\"topicName\":\"topic\",\"partitions\":[{" +
                             
"\"partitionIndex\":0,\"errorCode\":0,\"leaderId\":1,\"leaderEpoch\":1,\"voteGranted\":true}]}],"
 +
                             
"\"nodeEndpoints\":[{\"nodeId\":1,\"host\":\"localhost\",\"port\":9990}]}")
@@ -377,18 +396,19 @@ public class RaftUtilTest {
     @ParameterizedTest
     @MethodSource("voteRequestTestCases")
     public void testSingletonVoteRequestForAllVersion(final short version, 
final String expectedJson) {
-        int candidateEpoch = 1;
+        int replicaEpoch = 1;
         int lastEpoch = 1000;
         long lastEpochOffset = 1000;
 
         VoteRequestData voteRequestData = RaftUtil.singletonVoteRequest(
-                topicPartition,
-                clusterId,
-                candidateEpoch,
-                ReplicaKey.of(1, Uuid.ONE_UUID),
-                ReplicaKey.of(2, Uuid.ONE_UUID),
-                lastEpoch,
-                lastEpochOffset
+            topicPartition,
+            clusterId,
+            replicaEpoch,
+            ReplicaKey.of(1, TEST_DIRECTORY_ID1),
+            ReplicaKey.of(2, TEST_DIRECTORY_ID2),
+            lastEpoch,
+            lastEpochOffset,
+            version >= 2
         );
         JsonNode json = VoteRequestDataJsonConverter.write(voteRequestData, 
version);
         assertEquals(expectedJson, json.toString());
diff --git a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
index a21c7b5b9aa..ba867a01e0d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java
@@ -89,9 +89,22 @@ class ResignedStateTest {
     public void testGrantVote(boolean isLogUpToDate) {
         ResignedState state = newResignedState(Set.of(1, 2, 3));
 
-        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
-        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
-        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
+        );
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
+        );
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(3, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
+        );
+
+        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
     }
 
     @Test
diff --git a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
index 0870894067e..d4131e77c05 100644
--- a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
@@ -82,15 +82,28 @@ public class UnattachedStateTest {
 
         assertEquals(
             isLogUpToDate,
-            state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
+            state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
         );
         assertEquals(
             isLogUpToDate,
-            state.canGrantVote(ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
+            state.canGrantVote(ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
         );
         assertEquals(
             isLogUpToDate,
-            state.canGrantVote(ReplicaKey.of(3, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
+            state.canGrantVote(ReplicaKey.of(3, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
+        );
+
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, false)
+        );
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, false)
+        );
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(3, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, false)
         );
     }
 
@@ -112,9 +125,24 @@ public class UnattachedStateTest {
         // Check that the leader is persisted if the leader is known
         assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
state.election());
 
-        // Check that the replica rejects all votes request if the leader is 
known
-        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
-        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
-        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
+        // Check that the replica can grant PreVotes if the log is up-to-date, 
even if the last leader is known
+        // This is because nodes in Unattached have not successfully fetched 
from the leader yet
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
+        );
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
+        );
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(3, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate, true)
+        );
+
+        // Check that the replica rejects all standard votes request if the 
leader is known
+        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
     }
 }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java 
b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java
index 2daffe84e7c..c7ec0292e40 100644
--- a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java
@@ -83,35 +83,67 @@ class UnattachedStateWithVoteTest {
     public void testCanGrantVoteWithoutDirectoryId(boolean isLogUpToDate) {
         UnattachedState state = 
newUnattachedVotedState(ReplicaKey.NO_DIRECTORY_ID);
 
-        assertTrue(
-            state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)
         );
-        assertTrue(
-            state.canGrantVote(
-                ReplicaKey.of(votedId, Uuid.randomUuid()),
-                isLogUpToDate
-            )
+        assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
+
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(votedId, Uuid.randomUuid()), 
isLogUpToDate, true)
         );
+        assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
Uuid.randomUuid()), isLogUpToDate, false));
 
-        assertFalse(
-            state.canGrantVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
+        // Can grant PreVote to other replicas even if we have granted a 
standard vote to another replica
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)
         );
+        assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
     }
 
-    @Test
-    void testCanGrantVoteWithDirectoryId() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testCanGrantVoteWithDirectoryId(boolean isLogUpToDate) {
         Uuid votedDirectoryId = Uuid.randomUuid();
         UnattachedState state = newUnattachedVotedState(votedDirectoryId);
 
-        assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
votedDirectoryId), false));
+        // Same voterKey
+        // We can reject PreVote for a replica we have already granted a 
standard vote to if their log is behind
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(votedId, votedDirectoryId), 
isLogUpToDate, true)
+        );
+        assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
votedDirectoryId), isLogUpToDate, false));
 
-        assertFalse(
-            state.canGrantVote(ReplicaKey.of(votedId, Uuid.randomUuid()), 
false)
+        // Different directoryId
+        // We can grant PreVote for a replica we have already granted a 
standard vote to if their log is up-to-date,
+        // even if the directoryId is different
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(votedId, Uuid.randomUuid()), 
isLogUpToDate, true)
+        );
+        assertFalse(state.canGrantVote(ReplicaKey.of(votedId, 
Uuid.randomUuid()), isLogUpToDate, false));
+
+        // Missing directoryId
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)
         );
-        assertFalse(state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, false));
 
-        assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, 
votedDirectoryId), false));
-        assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), false));
+        // Different voterId
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(votedId + 1, votedDirectoryId), 
isLogUpToDate, true)
+        );
+        assertEquals(
+            isLogUpToDate,
+            state.canGrantVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate, true)
+        );
+        assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, 
votedDirectoryId), true, false));
+        assertFalse(state.canGrantVote(ReplicaKey.of(votedId + 1, 
ReplicaKey.NO_DIRECTORY_ID), true, false));
     }
 
     @Test

Reply via email to