This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 6390671a0d4 KAFKA-17030; Unattached voters will fetch from bootstrap 
servers (#17352)
6390671a0d4 is described below

commit 6390671a0d432f6e61fb7660e1255d77df8c66a4
Author: Alyssa Huang <[email protected]>
AuthorDate: Wed Dec 11 08:38:14 2024 -0800

    KAFKA-17030; Unattached voters will fetch from bootstrap servers (#17352)
    
    Because the set of voters are dynamic (KIP-953), it is possible for a 
replica to believe they are a voter while the current leader doesn't have that 
replica in the voter set. In this replicated state, the leader will not sent 
BeginQuorumEpoch requests to such a replica. This means that such replicas will 
not be able to discover the leader.
    
    This change will help Unattached rediscover the leader by sending Fetch 
requests to the the bootstrap servers.
    Followers have a similar issue - if they are unable to communicate with the 
leader they should try contacting the bootstrap servers.
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  52 ++++----
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 134 ++++++++++++++++++++-
 2 files changed, 156 insertions(+), 30 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 51aa5e59f2f..ffed1cfc387 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2755,7 +2755,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             .setReplicaState(new 
FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel()));
     }
 
-    private long maybeSendAnyVoterFetch(long currentTimeMs) {
+    private long maybeSendFetchToAnyBootstrap(long currentTimeMs) {
         Optional<Node> readyNode = 
requestManager.findReadyBootstrapServer(currentTimeMs);
         if (readyNode.isPresent()) {
             return maybeSendRequest(
@@ -3045,7 +3045,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             }
             state.resetUpdateVoterPeriod(currentTimeMs);
         } else {
-            backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs);
+            backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
         }
 
         return Math.min(
@@ -3059,28 +3059,30 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
         if (state.hasFetchTimeoutExpired(currentTimeMs)) {
-            return maybeSendAnyVoterFetch(currentTimeMs);
+            return maybeSendFetchToAnyBootstrap(currentTimeMs);
         } else {
-            final long backoffMs;
-
-            // If the current leader is backing off due to some failure or if 
the
-            // request has timed out, then we attempt to send the Fetch to 
another
-            // voter in order to discover if there has been a leader change.
-            Node leaderNode = state.leaderNode(channel.listenerName());
-            if (requestManager.hasRequestTimedOut(leaderNode, currentTimeMs)) {
-                // Once the request has timed out backoff the connection
-                requestManager.reset(leaderNode);
-                backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
-            } else if (requestManager.isBackingOff(leaderNode, currentTimeMs)) 
{
-                backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
-            } else if (!requestManager.hasAnyInflightRequest(currentTimeMs)) {
-                backoffMs = maybeSendFetchOrFetchSnapshot(state, 
currentTimeMs);
-            } else {
-                backoffMs = 
requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs);
-            }
+            return maybeSendFetchToBestNode(state, currentTimeMs);
+        }
+    }
 
-            return Math.min(backoffMs, 
state.remainingFetchTimeMs(currentTimeMs));
+    private long maybeSendFetchToBestNode(FollowerState state, long 
currentTimeMs) {
+        // If the current leader is backing off due to some failure or if the
+        // request has timed out, then we attempt to send the Fetch to another
+        // voter in order to discover if there has been a leader change.
+        final long backoffMs;
+        Node leaderNode = state.leaderNode(channel.listenerName());
+        if (requestManager.hasRequestTimedOut(leaderNode, currentTimeMs)) {
+            // Once the request has timed out backoff the connection
+            requestManager.reset(leaderNode);
+            backoffMs = maybeSendFetchToAnyBootstrap(currentTimeMs);
+        } else if (requestManager.isBackingOff(leaderNode, currentTimeMs)) {
+            backoffMs = maybeSendFetchToAnyBootstrap(currentTimeMs);
+        } else if (!requestManager.hasAnyInflightRequest(currentTimeMs)) {
+            backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs);
+        } else {
+            backoffMs = 
requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs);
         }
+        return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs));
     }
 
     private long maybeSendFetchOrFetchSnapshot(FollowerState state, long 
currentTimeMs) {
@@ -3125,7 +3127,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         if (quorum.isVoter()) {
             return pollUnattachedAsVoter(state, currentTimeMs);
         } else {
-            return pollUnattachedAsObserver(state, currentTimeMs);
+            return pollUnattachedCommon(state, currentTimeMs);
         }
     }
 
@@ -3139,12 +3141,12 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             transitionToCandidate(currentTimeMs);
             return 0L;
         } else {
-            return state.remainingElectionTimeMs(currentTimeMs);
+            return pollUnattachedCommon(state, currentTimeMs);
         }
     }
 
-    private long pollUnattachedAsObserver(UnattachedState state, long 
currentTimeMs) {
-        long fetchBackoffMs = maybeSendAnyVoterFetch(currentTimeMs);
+    private long pollUnattachedCommon(UnattachedState state, long 
currentTimeMs) {
+        long fetchBackoffMs = maybeSendFetchToAnyBootstrap(currentTimeMs);
         return Math.min(fetchBackoffMs, 
state.remainingElectionTimeMs(currentTimeMs));
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 6e5048e9e64..c1c5857148c 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -851,12 +851,33 @@ public class KafkaRaftClientTest {
             .build();
 
         context.assertUnknownLeader(0);
-        context.time.sleep(2L * context.electionTimeoutMs());
+        context.pollUntilRequest();
+        RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L, 
0);
+        assertTrue(context.client.quorum().isUnattached());
+        assertTrue(context.client.quorum().isVoter());
+
+        // receives a fetch response which does not specify who the leader is
+        context.time.sleep(context.electionTimeoutMs() / 2);
+        context.deliverResponse(
+            request.correlationId(),
+            request.destination(),
+            context.fetchResponse(0, -1, MemoryRecords.EMPTY, -1, 
Errors.NOT_LEADER_OR_FOLLOWER)
+        );
+
+        // should remain unattached voter
+        context.client.poll();
+        assertTrue(context.client.quorum().isUnattached());
+        assertTrue(context.client.quorum().isVoter());
+
+        // after election timeout should become candidate
+        context.time.sleep(context.electionTimeoutMs() * 2L);
+        context.pollUntilRequest();
+        assertTrue(context.client.quorum().isCandidate());
 
         context.pollUntilRequest();
         context.assertVotedCandidate(1, localId);
 
-        RaftRequest.Outbound request = context.assertSentVoteRequest(1, 0, 0L, 
1);
+        request = context.assertSentVoteRequest(1, 0, 0L, 1);
         context.deliverResponse(
             request.correlationId(),
             request.destination(),
@@ -1821,6 +1842,37 @@ public class KafkaRaftClientTest {
         assertEquals(0, context.channel.drainSendQueue().size());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean withKip853Rpc) 
throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int leaderNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(localId, otherNodeId, leaderNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(withKip853Rpc)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 
0L, 0);
+        assertTrue(context.client.quorum().isUnattached());
+        assertTrue(context.client.quorum().isVoter());
+
+        // receives a fetch response specifying who the leader is
+        Errors responseError = (request.destination().id() == otherNodeId) ? 
Errors.NOT_LEADER_OR_FOLLOWER : Errors.NONE;
+        context.deliverResponse(
+            request.correlationId(),
+            request.destination(),
+            context.fetchResponse(epoch, leaderNodeId, MemoryRecords.EMPTY, 
0L, responseError)
+        );
+
+        context.client.poll();
+        assertTrue(context.client.quorum().isFollower());
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeObserverNoPreviousState(boolean withKip853Rpc) 
throws Exception {
@@ -2669,6 +2721,80 @@ public class KafkaRaftClientTest {
         context.assertElectedLeader(epoch, voter3);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .collect(Collectors.toList());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withKip853Rpc(withKip853Rpc)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest1.destination().id());
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+
+        context.deliverResponse(
+            fetchRequest1.correlationId(),
+            fetchRequest1.destination(),
+            context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, 
Errors.BROKER_NOT_AVAILABLE)
+        );
+        context.pollUntilRequest();
+
+        // We should retry the Fetch against the other voter since the original
+        // voter connection will be backing off.
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
+        assertNotEquals(leaderId, fetchRequest2.destination().id());
+        
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testFollowerLeaderRediscoveryAfterRequestTimeout(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(leaderId, localId, otherNodeId);
+        List<InetSocketAddress> bootstrapServers = voters
+            .stream()
+            .map(RaftClientTestContext::mockAddress)
+            .collect(Collectors.toList());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withBootstrapServers(Optional.of(bootstrapServers))
+            .withKip853Rpc(withKip853Rpc)
+            .withElectedLeader(epoch, leaderId)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
+        assertEquals(leaderId, fetchRequest1.destination().id());
+        context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
+
+        context.time.sleep(context.requestTimeoutMs());
+        context.pollUntilRequest();
+
+        // We should retry the Fetch against the other voter since the original
+        // voter connection will be backing off.
+        RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
+        assertNotEquals(leaderId, fetchRequest2.destination().id());
+        
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
+        context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void 
testObserverLeaderRediscoveryAfterBrokerNotAvailableError(boolean 
withKip853Rpc) throws Exception {
@@ -2708,12 +2834,10 @@ public class KafkaRaftClientTest {
         
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
         context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
 
-        Errors error = fetchRequest2.destination().id() == leaderId ?
-            Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER;
         context.deliverResponse(
             fetchRequest2.correlationId(),
             fetchRequest2.destination(),
-            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
error)
+            context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, 
Errors.NOT_LEADER_OR_FOLLOWER)
         );
         context.client.poll();
 

Reply via email to