This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 550bf604605 KAFKA-16927; Handle expanding leader endpoints (#17363)
550bf604605 is described below
commit 550bf6046056d9ef34a50aa5a0c5b2303d1fe00b
Author: José Armando García Sancio <[email protected]>
AuthorDate: Fri Oct 4 10:51:43 2024 -0400
KAFKA-16927; Handle expanding leader endpoints (#17363)
When a replica restarts in the follower state it is possible for the set of
leader endpoints to not match the latest set of leader endpoints. Voters will
discover the latest set of leader endpoints through the BEGIN_QUORUM_EPOCH
request. This means that KRaft needs to allow for the replica to transition
from Follower to Follower when only the set of leader endpoints has changed.
Reviewers: Colin P. McCabe <[email protected]>, Alyssa Huang
<[email protected]>
---
.../java/org/apache/kafka/raft/QuorumState.java | 54 ++++++++++++++++++----
.../kafka/raft/KafkaRaftClientReconfigTest.java | 39 ++++++++++++++++
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 41 ++++++++++++++++
.../org/apache/kafka/raft/QuorumStateTest.java | 34 ++++++++++++++
.../apache/kafka/raft/RaftClientTestContext.java | 26 ++++++++++-
5 files changed, 185 insertions(+), 9 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 888273723f0..0598ce062df 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -470,15 +470,53 @@ public class QuorumState {
*/
public void transitionToFollower(int epoch, int leaderId, Endpoints
endpoints) {
int currentEpoch = state.epoch();
- if (localId.isPresent() && leaderId == localId.getAsInt()) {
- throw new IllegalStateException("Cannot transition to Follower
with leader " + leaderId +
- " and epoch " + epoch + " since it matches the local broker.id
" + localId);
+ if (endpoints.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot transition to Follower with leader %s and epoch %s
without a leader endpoint",
+ leaderId,
+ epoch
+ )
+ );
+ } else if (localId.isPresent() && leaderId == localId.getAsInt()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Follower with leader %s and epoch %s
since it matches the local node.id %s",
+ leaderId,
+ epoch,
+ localId
+ )
+ );
} else if (epoch < currentEpoch) {
- throw new IllegalStateException("Cannot transition to Follower
with leader " + leaderId +
- " and epoch " + epoch + " since the current epoch " +
currentEpoch + " is larger");
- } else if (epoch == currentEpoch && (isFollower() || isLeader())) {
- throw new IllegalStateException("Cannot transition to Follower
with leader " + leaderId +
- " and epoch " + epoch + " from state " + state);
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Follower with leader %s and epoch %s
since the current epoch %s is larger",
+ leaderId,
+ epoch,
+ currentEpoch
+ )
+ );
+ } else if (epoch == currentEpoch) {
+ if (isFollower() && state.leaderEndpoints().size() >=
endpoints.size()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Follower with leader %s, epoch
%s and endpoints %s from state %s",
+ leaderId,
+ epoch,
+ endpoints,
+ state
+ )
+ );
+ } else if (isLeader()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Follower with leader %s and
epoch %s from state %s",
+ leaderId,
+ epoch,
+ state
+ )
+ );
+ }
}
durableTransitionTo(
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 63e7f046b10..04ab47f34c9 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -2257,6 +2257,45 @@ public class KafkaRaftClientReconfigTest {
assertEquals(-2, fetchRequest.destination().id());
}
+ @Test
+ public void testHandleBeginQuorumRequestMoreEndpoints() throws Exception {
+ ReplicaKey local = replicaKey(randomReplicaId(), true);
+ ReplicaKey leader = replicaKey(local.id() + 1, true);
+ int leaderEpoch = 3;
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withElectedLeader(leaderEpoch, leader.id())
+ .withKip853Rpc(true)
+ .build();
+
+ context.client.poll();
+
+ HashMap<ListenerName, InetSocketAddress> leaderListenersMap = new
HashMap<>(2);
+ leaderListenersMap.put(
+ VoterSetTest.DEFAULT_LISTENER_NAME,
+ InetSocketAddress.createUnresolved("localhost", 9990 + leader.id())
+ );
+ leaderListenersMap.put(
+ ListenerName.normalised("ANOTHER_LISTENER"),
+ InetSocketAddress.createUnresolved("localhost", 8990 + leader.id())
+ );
+ Endpoints leaderEndpoints =
Endpoints.fromInetSocketAddresses(leaderListenersMap);
+
+ context.deliverRequest(context.beginEpochRequest(leaderEpoch,
leader.id(), leaderEndpoints));
+ context.pollUntilResponse();
+
+ context.assertElectedLeader(leaderEpoch, leader.id());
+
+ context.assertSentBeginQuorumEpochResponse(
+ Errors.NONE,
+ leaderEpoch,
+ OptionalInt.of(leader.id())
+ );
+ }
+
private static void verifyVotersRecord(
VoterSet expectedVoterSet,
ByteBuffer recordKey,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 1477ee0a39e..6e5048e9e64 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
@@ -55,6 +56,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
@@ -960,6 +962,45 @@ public class KafkaRaftClientTest {
);
}
+ @Test
+ public void testHandleBeginQuorumRequestMoreEndpoints() throws Exception {
+ ReplicaKey local = replicaKey(randomReplicaId(), true);
+ ReplicaKey leader = replicaKey(local.id() + 1, true);
+ int leaderEpoch = 3;
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withStaticVoters(voters)
+ .withElectedLeader(leaderEpoch, leader.id())
+ .withKip853Rpc(true)
+ .build();
+
+ context.client.poll();
+
+ HashMap<ListenerName, InetSocketAddress> leaderListenersMap = new
HashMap<>(2);
+ leaderListenersMap.put(
+ VoterSetTest.DEFAULT_LISTENER_NAME,
+ InetSocketAddress.createUnresolved("localhost", 9990 + leader.id())
+ );
+ leaderListenersMap.put(
+ ListenerName.normalised("ANOTHER_LISTENER"),
+ InetSocketAddress.createUnresolved("localhost", 8990 + leader.id())
+ );
+ Endpoints leaderEndpoints =
Endpoints.fromInetSocketAddresses(leaderListenersMap);
+
+ context.deliverRequest(context.beginEpochRequest(leaderEpoch,
leader.id(), leaderEndpoints));
+ context.pollUntilResponse();
+
+ context.assertElectedLeader(leaderEpoch, leader.id());
+
+ context.assertSentBeginQuorumEpochResponse(
+ Errors.NONE,
+ leaderEpoch,
+ OptionalInt.of(leader.id())
+ );
+ }
+
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleBeginQuorumResponse(boolean withKip853Rpc) throws
Exception {
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
index fc14f4d9bc3..7131701da76 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
@@ -32,6 +33,7 @@ import org.mockito.Mockito;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
@@ -1211,6 +1213,38 @@ public class QuorumStateTest {
);
}
+ @ParameterizedTest
+ @EnumSource(value = KRaftVersion.class)
+ public void testFollowerToFollowerSameEpochAndMoreEndpoints(KRaftVersion
kraftVersion) {
+ int node1 = 1;
+ int node2 = 2;
+ VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2),
kraftVersion);
+ QuorumState state = initializeEmptyState(voters, kraftVersion);
+ state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
+ state.transitionToFollower(
+ 8,
+ node2,
+ voters.listeners(node2)
+ );
+
+ HashMap<ListenerName, InetSocketAddress> newNode2ListenersMap = new
HashMap<>(2);
+ newNode2ListenersMap.put(
+ VoterSetTest.DEFAULT_LISTENER_NAME,
+ InetSocketAddress.createUnresolved("localhost", 9990 + node2)
+ );
+ newNode2ListenersMap.put(
+ ListenerName.normalised("ANOTHER_LISTENER"),
+ InetSocketAddress.createUnresolved("localhost", 8990 + node2)
+ );
+ Endpoints newNode2Endpoints =
Endpoints.fromInetSocketAddresses(newNode2ListenersMap);
+
+ state.transitionToFollower(
+ 8,
+ node2,
+ newNode2Endpoints
+ );
+ }
+
@ParameterizedTest
@EnumSource(value = KRaftVersion.class)
public void testFollowerToFollowerHigherEpoch(KRaftVersion kraftVersion) {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 0f1cfd6f3c4..7b1a55c1e02 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1367,6 +1367,14 @@ public final class RaftClientTestContext {
return beginEpochRequest(clusterId, epoch, leaderId);
}
+ BeginQuorumEpochRequestData beginEpochRequest(int epoch, int leaderId,
Endpoints endpoints) {
+ ReplicaKey localReplicaKey = kip853Rpc ?
+ ReplicaKey.of(localIdOrThrow(), localDirectoryId) :
+ ReplicaKey.of(-1, ReplicaKey.NO_DIRECTORY_ID);
+
+ return beginEpochRequest(clusterId, epoch, leaderId, endpoints,
localReplicaKey);
+ }
+
BeginQuorumEpochRequestData beginEpochRequest(String clusterId, int epoch,
int leaderId) {
ReplicaKey localReplicaKey = kip853Rpc ?
ReplicaKey.of(localIdOrThrow(), localDirectoryId) :
@@ -1380,13 +1388,29 @@ public final class RaftClientTestContext {
int epoch,
int leaderId,
ReplicaKey voterKey
+ ) {
+ return beginEpochRequest(
+ clusterId,
+ epoch,
+ leaderId,
+ startingVoters.listeners(leaderId),
+ voterKey
+ );
+ }
+
+ BeginQuorumEpochRequestData beginEpochRequest(
+ String clusterId,
+ int epoch,
+ int leaderId,
+ Endpoints endpoints,
+ ReplicaKey voterKey
) {
return RaftUtil.singletonBeginQuorumEpochRequest(
metadataPartition,
clusterId,
epoch,
leaderId,
- startingVoters.listeners(leaderId),
+ endpoints,
voterKey
);
}