This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e1bf1552709 KAFKA-17055; Use random replica ids in kraft protocol tests
e1bf1552709 is described below
commit e1bf155270926ce236991cef71a6d100a5dd8432
Author: Mason Chen <[email protected]>
AuthorDate: Wed Jul 17 03:33:32 2024 +1200
KAFKA-17055; Use random replica ids in kraft protocol tests
All of the tests in KafkakRaftClientTest and KafkaRaftClientSnapshotTest
use well known ids like 0, 1, etc. Because of this those tests were not able to
catch a bug in the BeginQuorumEpoch schema were the default value for VoterId
was 0 instead of -1.
Improve those tests by using random valid replica id to lower the
probability that the replica id will match the default value of the schema.
Reviewers: José Armando García Sancio <[email protected]>
---
.../kafka/raft/KafkaRaftClientSnapshotTest.java | 155 ++++---
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 506 +++++++++++----------
.../apache/kafka/raft/RaftClientTestContext.java | 36 +-
3 files changed, 370 insertions(+), 327 deletions(-)
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 11fefd347ca..dac88b1b9cc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -52,6 +52,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
@@ -64,7 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public final class KafkaRaftClientSnapshotTest {
@Test
public void testLatestSnapshotId() throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -82,7 +83,7 @@ public final class KafkaRaftClientSnapshotTest {
@Test
public void testLatestSnapshotIdMissing() throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -98,9 +99,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @CsvSource({"false,false", "false,true", "true,false", "true,true"})
+ @CsvSource({ "false,false", "false,true", "true,false", "true,true" })
public void testLeaderListenerNotified(boolean entireLog, boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, false);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
@@ -135,9 +136,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFollowerListenerNotified(boolean entireLog) throws
Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -177,9 +178,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testSecondListenerNotified(boolean entireLog) throws Exception
{
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -223,9 +224,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testListenerRenotified(boolean withKip853Rpc) throws Exception
{
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
@@ -279,10 +280,10 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testLeaderImmediatelySendsSnapshotId(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 4);
@@ -313,9 +314,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchRequestOffsetLessThanLogStart(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -361,10 +362,10 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchRequestOffsetAtZero(boolean withKip853Rpc) throws
Exception {
// When the follower sends a FETCH request at offset 0, reply with
snapshot id if it exists
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -409,9 +410,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchRequestWithLargerLastFetchedEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -449,9 +450,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchRequestTruncateToLogStart(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int syncNodeId = otherNodeKey.id() + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(),
syncNodeId);
@@ -499,9 +500,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchRequestAtLogStartOffsetWithValidEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int syncNodeId = otherNodeKey.id() + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(),
syncNodeId);
@@ -545,9 +546,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchRequestAtLogStartOffsetWithInvalidEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int syncNodeId = otherNodeKey.id() + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(),
syncNodeId);
@@ -597,11 +598,11 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchRequestWithLastFetchedEpochLessThanOldestSnapshot(
boolean withKip853Rpc
) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int syncNodeId = otherNodeKey.id() + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(),
syncNodeId);
@@ -650,9 +651,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestMissingSnapshot(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -713,9 +714,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestUnknownPartition(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
TopicPartition topicPartition = new TopicPartition("unknown", 0);
@@ -744,9 +745,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestAsLeader(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
List<String> records = Arrays.asList("foo", "bar");
@@ -795,14 +796,14 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void
testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters(
boolean withKip853Rpc
) throws Exception {
- int localId = 0;
- ReplicaKey voter1 = replicaKey(1, withKip853Rpc);
- ReplicaKey voter2 = replicaKey(2, withKip853Rpc);
- ReplicaKey observer3 = replicaKey(3, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey voter1 = replicaKey(localId + 1, withKip853Rpc);
+ ReplicaKey voter2 = replicaKey(localId + 2, withKip853Rpc);
+ ReplicaKey observer3 = replicaKey(localId + 3, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, voter1.id(), voter2.id());
OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
List<String> records = Arrays.asList("foo", "bar");
@@ -890,9 +891,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testPartialFetchSnapshotRequestAsLeader(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
OffsetAndEpoch snapshotId = new OffsetAndEpoch(2, 1);
List<String> records = Arrays.asList("foo", "bar");
@@ -971,9 +972,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestAsFollower(boolean withKip853Rpc)
throws IOException {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1003,9 +1004,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestWithInvalidPosition(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
List<String> records = Arrays.asList("foo", "bar");
@@ -1063,9 +1064,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestWithOlderEpoch(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
OffsetAndEpoch snapshotId = Snapshots.BOOTSTRAP_SNAPSHOT_ID;
@@ -1096,9 +1097,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestWithNewerEpoch(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Utils.mkSet(localId, localId + 1);
OffsetAndEpoch snapshotId = Snapshots.BOOTSTRAP_SNAPSHOT_ID;
@@ -1129,9 +1130,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchResponseWithInvalidSnapshotId(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1192,9 +1193,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchResponseWithSnapshotId(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1265,9 +1266,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchSnapshotResponsePartialData(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1370,9 +1371,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchSnapshotResponseMissingSnapshot(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1431,9 +1432,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchSnapshotResponseFromNewerEpochNotLeader(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int firstLeaderId = localId + 1;
int secondLeaderId = firstLeaderId + 1;
Set<Integer> voters = Utils.mkSet(localId, firstLeaderId,
secondLeaderId);
@@ -1493,9 +1494,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchSnapshotResponseFromNewerEpochLeader(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1554,9 +1555,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchSnapshotResponseFromOlderEpoch(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1625,9 +1626,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchSnapshotResponseWithInvalidId(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1741,9 +1742,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testFetchSnapshotResponseToNotFollower(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
@@ -1815,12 +1816,12 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
+ @ValueSource(booleans = { false, true })
public void testFetchSnapshotRequestClusterIdValidation(
boolean withKip853Rpc
) throws Exception {
- int localId = 0;
- ReplicaKey otherNode = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNode = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNode.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1893,9 +1894,9 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testCreateSnapshotAsLeaderWithInvalidSnapshotId(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
int epoch = 2;
@@ -1941,11 +1942,11 @@ public final class KafkaRaftClientSnapshotTest {
}
@ParameterizedTest
- @ValueSource(booleans = {true, false})
+ @ValueSource(booleans = { true, false })
public void testCreateSnapshotAsFollowerWithInvalidSnapshotId(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int leaderId = 1;
- int otherFollowerId = 2;
+ int localId = randomReplicaId();
+ int leaderId = localId + 1;
+ int otherFollowerId = localId + 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, leaderId, otherFollowerId);
@@ -2014,6 +2015,10 @@ public final class KafkaRaftClientSnapshotTest {
return ReplicaKey.of(id, directoryId);
}
+ private static int randomReplicaId() {
+ return ThreadLocalRandom.current().nextInt(1025);
+ }
+
public static FetchSnapshotRequestData fetchSnapshotRequest(
TopicPartition topicPartition,
int epoch,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 9e1258ba666..aaf7f6c93c5 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
@@ -62,6 +63,7 @@ import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -82,7 +84,7 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeSingleMemberQuorum(boolean withKip853Rpc) throws
IOException {
- int localId = 0;
+ int localId = randomReplicaId();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, Collections.singleton(localId))
.withKip853Rpc(withKip853Rpc)
.build();
@@ -94,8 +96,7 @@ public class KafkaRaftClientTest {
@ValueSource(booleans = { true, false })
public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum(boolean
withKip853Rpc) throws Exception {
// Start off as leader. We should still bump the epoch after
initialization
-
- int localId = 0;
+ int localId = randomReplicaId();
int initialEpoch = 2;
Set<Integer> voters = Collections.singleton(localId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -114,8 +115,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testRejectVotesFromSameEpochAfterResigningLeadership(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
+ ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
int epoch = 2;
@@ -145,8 +147,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testRejectVotesFromSameEpochAfterResigningCandidacy(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
+ ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
int epoch = 2;
@@ -176,8 +179,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testGrantVotesFromHigherEpochAfterResigningLeadership(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
+ ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
int epoch = 2;
@@ -212,8 +216,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testGrantVotesFromHigherEpochAfterResigningCandidacy(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
+ ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
int epoch = 2;
@@ -248,8 +253,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testGrantVotesWhenShuttingDown(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
+ ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -275,7 +281,13 @@ public class KafkaRaftClientTest {
context.client.poll();
// We will first transition to unattached and then grant vote and then
transition to voted
- assertTrue(context.client.quorum().isVoted());
+ assertTrue(
+ context.client.quorum().isVoted(),
+ "Local Id: " + localId +
+ " Remote Id: " + remoteId +
+ " Quorum local Id: " + context.client.quorum().localIdOrSentinel()
+
+ " Quorum leader Id: " +
context.client.quorum().leaderIdOrSentinel()
+ );
context.assertVotedCandidate(epoch + 1, remoteKey.id());
context.assertSentVoteResponse(Errors.NONE, epoch + 1,
OptionalInt.empty(), true);
}
@@ -283,8 +295,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeAsResignedAndBecomeCandidate(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int remoteId = 1;
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, remoteId);
int epoch = 2;
@@ -311,8 +323,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeAsResignedLeaderFromStateStore(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- Set<Integer> voters = Utils.mkSet(localId, 1);
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, remoteId);
int epoch = 2;
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -329,7 +342,7 @@ public class KafkaRaftClientTest {
assertThrows(NotLeaderException.class, () ->
context.client.scheduleAppend(epoch, Arrays.asList("a", "b")));
context.pollUntilRequest();
- RaftRequest.Outbound request =
context.assertSentEndQuorumEpochRequest(epoch, 1);
+ RaftRequest.Outbound request =
context.assertSentEndQuorumEpochRequest(epoch, remoteId);
context.deliverResponse(
request.correlationId(),
request.destination(),
@@ -346,8 +359,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testAppendFailedWithNotLeaderException(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- Set<Integer> voters = Utils.mkSet(localId, 1);
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
+ Set<Integer> voters = Utils.mkSet(localId, remoteId);
int epoch = 2;
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -361,8 +375,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testAppendFailedWithBufferAllocationException(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
@@ -388,8 +402,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testAppendFailedWithFencedEpoch(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -409,8 +423,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testAppendFailedWithRecordBatchTooLargeException(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -427,15 +441,15 @@ public class KafkaRaftClientTest {
batchToLarge.add("a");
assertThrows(RecordBatchTooLargeException.class,
- () -> context.client.scheduleAtomicAppend(epoch,
OptionalLong.empty(), batchToLarge));
+ () -> context.client.scheduleAtomicAppend(epoch,
OptionalLong.empty(), batchToLarge));
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testEndQuorumEpochRetriesWhileResigned(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- int voter1 = 1;
- int voter2 = 2;
+ int localId = randomReplicaId();
+ int voter1 = localId + 1;
+ int voter2 = localId + 2;
Set<Integer> voters = Utils.mkSet(localId, voter1, voter2);
int epoch = 19;
@@ -479,8 +493,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testResignWillCompleteFetchPurgatory(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ int remoteId = localId + 1;
+ ReplicaKey otherNodeKey = replicaKey(remoteId, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -517,8 +532,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testResignInOlderEpochIgnored(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -543,9 +558,9 @@ public class KafkaRaftClientTest {
public void testHandleBeginQuorumEpochAfterUserInitiatedResign(
boolean withKip853Rpc
) throws Exception {
- int localId = 0;
- int remoteId1 = 1;
- int remoteId2 = 2;
+ int localId = randomReplicaId();
+ int remoteId1 = localId + 1;
+ int remoteId2 = localId + 2;
Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -571,9 +586,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int remoteId1 = 1;
- int remoteId2 = 2;
+ int localId = randomReplicaId();
+ int remoteId1 = localId + 1;
+ int remoteId2 = localId + 2;
Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -601,10 +616,13 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey remoteKey1 = replicaKey(1, withKip853Rpc);
- ReplicaKey remoteKey2 = replicaKey(2, withKip853Rpc);
- ReplicaKey observerKey3 = replicaKey(3, withKip853Rpc);
+ int localId = randomReplicaId();
+ int remoteId1 = localId + 1;
+ int remoteId2 = localId + 2;
+ int observerId = localId + 3;
+ ReplicaKey remoteKey1 = replicaKey(remoteId1, withKip853Rpc);
+ ReplicaKey remoteKey2 = replicaKey(remoteId2, withKip853Rpc);
+ ReplicaKey observerKey3 = replicaKey(observerId, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, remoteKey1.id(),
remoteKey2.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -658,7 +676,7 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderShouldNotResignLeadershipIfOnlyOneVoters(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Utils.mkSet(localId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -677,8 +695,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testElectionTimeoutAfterUserInitiatedResign(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -730,8 +748,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testCannotResignWithLargerEpochThanCurrentEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -746,8 +764,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testCannotResignIfNotLeader(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int leaderEpoch = 2;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -763,8 +781,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testCannotResignIfObserver(boolean withKip853Rpc) throws
Exception {
- int leaderId = 1;
- int otherNodeId = 2;
+ int leaderId = randomReplicaId();
+ int otherNodeId = randomReplicaId() + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
@@ -791,9 +809,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeAsCandidateFromStateStore(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
// Need 3 node to require a 2-node majority
- Set<Integer> voters = Utils.mkSet(localId, 1, 2);
+ Set<Integer> voters = Utils.mkSet(localId, localId + 1, localId + 2);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withVotedCandidate(2, ReplicaKey.of(localId,
ReplicaKey.NO_DIRECTORY_ID))
@@ -811,8 +829,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeAsCandidateAndBecomeLeader(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- final int otherNodeId = 1;
+ final int localId = randomReplicaId();
+ final int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withKip853Rpc(withKip853Rpc)
@@ -857,9 +875,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- final int firstNodeId = 1;
- final int secondNodeId = 2;
+ int localId = randomReplicaId();
+ final int firstNodeId = localId + 1;
+ final int secondNodeId = localId + 2;
Set<Integer> voters = Utils.mkSet(localId, firstNodeId, secondNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withKip853Rpc(withKip853Rpc)
@@ -878,6 +896,10 @@ public class KafkaRaftClientTest {
context.voteResponse(true, OptionalInt.empty(), 1)
);
+ VoteRequestData voteRequest = (VoteRequestData) request.data();
+ int voterId = voteRequest.voterId();
+ assertNotEquals(localId, voterId);
+
// Become leader after receiving the vote
context.pollUntil(() -> context.log.endOffset().offset() == 1L);
context.assertElectedLeader(1, localId);
@@ -898,14 +920,14 @@ public class KafkaRaftClientTest {
Record record = batch.iterator().next();
assertEquals(electionTimestamp, record.timestamp());
RaftClientTestContext.verifyLeaderChangeMessage(localId,
Arrays.asList(localId, firstNodeId, secondNodeId),
- Arrays.asList(firstNodeId, localId), record.key(), record.value());
+ Arrays.asList(voterId, localId), record.key(), record.value());
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleBeginQuorumRequest(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int votedCandidateEpoch = 2;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -929,8 +951,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleBeginQuorumResponse(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int leaderEpoch = 2;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -948,8 +970,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testEndQuorumIgnoredAsCandidateIfOlderEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
int jitterMs = 85;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -995,7 +1017,7 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testEndQuorumIgnoredAsLeaderIfOlderEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int voter2 = localId + 1;
ReplicaKey voter3 = replicaKey(localId + 2, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, voter2, voter3.id());
@@ -1026,7 +1048,7 @@ public class KafkaRaftClientTest {
public void testEndQuorumStartsNewElectionImmediatelyIfFollowerUnattached(
boolean withKip853Rpc
) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int voter2 = localId + 1;
ReplicaKey voter3 = replicaKey(localId + 2, withKip853Rpc);
int epoch = 2;
@@ -1056,8 +1078,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testAccumulatorClearedAfterBecomingFollower(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int lingerMs = 50;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1088,8 +1110,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testAccumulatorClearedAfterBecomingVoted(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int lingerMs = 50;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -1121,8 +1143,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testAccumulatorClearedAfterBecomingUnattached(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int lingerMs = 50;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -1155,9 +1177,8 @@ public class KafkaRaftClientTest {
public void testChannelWokenUpIfLingerTimeoutReachedWithoutAppend(boolean
withKip853Rpc) throws Exception {
// This test verifies that the client will set its poll timeout
accounting
// for the lingerMs of a pending append
-
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int lingerMs = 50;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1191,9 +1212,8 @@ public class KafkaRaftClientTest {
public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend(boolean
withKip853Rpc) throws Exception {
// This test verifies that the client will get woken up immediately
// if the linger timeout has expired during an append
-
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int lingerMs = 50;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1225,8 +1245,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleEndQuorumRequest(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int oldLeaderId = 1;
+ int localId = randomReplicaId();
+ int oldLeaderId = randomReplicaId() + 1;
int leaderEpoch = 2;
Set<Integer> voters = Utils.mkSet(localId, oldLeaderId);
@@ -1253,10 +1273,10 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void
testHandleEndQuorumRequestWithLowerPriorityToBecomeLeader(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey oldLeaderKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey oldLeaderKey = replicaKey(localId + 1, withKip853Rpc);
int leaderEpoch = 2;
- ReplicaKey preferredNextLeader = replicaKey(3, withKip853Rpc);
+ ReplicaKey preferredNextLeader = replicaKey(localId + 2,
withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, oldLeaderKey.id(),
preferredNextLeader.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1296,9 +1316,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testVoteRequestTimeout(boolean withKip853Rpc) throws Exception
{
- int localId = 0;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 1;
- int otherNodeId = 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1338,9 +1358,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleValidVoteRequestAsFollower(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int epoch = 2;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1359,10 +1379,10 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleVoteRequestAsFollowerWithElectedLeader(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int epoch = 2;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
- int electedLeaderId = 3;
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
+ int electedLeaderId = localId + 2;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(),
electedLeaderId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1381,10 +1401,10 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleVoteRequestAsFollowerWithVotedCandidate(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int epoch = 2;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
- ReplicaKey votedCandidateKey = replicaKey(3, withKip853Rpc);
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
+ ReplicaKey votedCandidateKey = replicaKey(localId + 2, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(),
votedCandidateKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1402,9 +1422,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleInvalidVoteRequestWithOlderEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int epoch = 2;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1422,10 +1442,10 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testHandleVoteRequestAsObserver(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int epoch = 2;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
- int otherNodeId2 = 2;
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
+ int otherNodeId2 = localId + 2;
Set<Integer> voters = Utils.mkSet(otherNodeKey.id(), otherNodeId2);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1443,8 +1463,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderIgnoreVoteRequestOnSameEpoch(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1466,8 +1486,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testListenerCommitCallbackAfterLeaderWrite(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1511,8 +1531,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderImmediatelySendsDivergingEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1543,8 +1563,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testCandidateIgnoreVoteRequestOnSameEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int leaderEpoch = 2;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -1564,8 +1584,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testRetryElection(boolean withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 1;
int exponentialFactor = 85; // set it large enough so that we will
bound on jitter
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1611,8 +1631,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeAsFollowerEmptyLog(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1631,8 +1651,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeAsFollowerNonEmptyLog(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
int lastEpoch = 3;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1652,8 +1672,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testVoterBecomeCandidateAfterFetchTimeout(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
int lastEpoch = 3;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1679,9 +1699,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInitializeObserverNoPreviousState(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- int leaderId = 1;
- int otherNodeId = 2;
+ int localId = randomReplicaId();
+ int leaderId = localId + 1;
+ int otherNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
@@ -1707,8 +1727,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testObserverQuorumDiscoveryFailure(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- int leaderId = 1;
+ int localId = randomReplicaId();
+ int leaderId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(leaderId);
List<InetSocketAddress> bootstrapServers = voters
@@ -1753,9 +1773,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testObserverSendDiscoveryFetchAfterFetchTimeout(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int leaderId = 1;
- int otherNodeId = 2;
+ int localId = randomReplicaId();
+ int leaderId = localId + 1;
+ int otherNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters
@@ -1796,10 +1816,9 @@ public class KafkaRaftClientTest {
public void testObserverHandleRetryFetchtToBootstrapServer(boolean
withKip853Rpc) throws Exception {
// This test tries to check that KRaft is able to handle a retrying
Fetch request to
// a boostrap server after a Fetch request to the leader.
-
- int localId = 0;
- int leaderId = 1;
- int otherNodeId = 2;
+ int localId = randomReplicaId();
+ int leaderId = localId + 1;
+ int otherNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters
@@ -1871,10 +1890,9 @@ public class KafkaRaftClientTest {
public void testObserverHandleRetryFetchToLeader(boolean withKip853Rpc)
throws Exception {
// This test tries to check that KRaft is able to handle a retrying
Fetch request to
// the leader after a Fetch request to the bootstrap server.
-
- int localId = 0;
- int leaderId = 1;
- int otherNodeId = 2;
+ int localId = randomReplicaId();
+ int leaderId = localId + 1;
+ int otherNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters
@@ -1929,8 +1947,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInvalidFetchRequest(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -1974,8 +1992,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@MethodSource("validFetchVersions")
public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short
version) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, false);
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
+ ReplicaKey otherNodeKey = replicaKey(otherNodeId, false);
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -1989,8 +2008,8 @@ public class KafkaRaftClientTest {
// Now we will advance the high watermark with a follower fetch
request.
FetchRequestData fetchRequestData = context.fetchRequest(epoch,
otherNodeKey, 1L, epoch, 0);
FetchRequestData request = new
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
- assertEquals((version < 15) ? 1 : -1, fetchRequestData.replicaId());
- assertEquals((version < 15) ? -1 : 1,
fetchRequestData.replicaState().replicaId());
+ assertEquals((version < 15) ? otherNodeId : -1,
fetchRequestData.replicaId());
+ assertEquals((version < 15) ? -1 : otherNodeId,
fetchRequestData.replicaState().replicaId());
context.deliverRequest(request, version);
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(localId));
@@ -2000,8 +2019,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFetchRequestClusterIdValidation(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2036,8 +2055,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testVoteRequestClusterIdValidation(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2070,8 +2089,8 @@ public class KafkaRaftClientTest {
@Test
public void testInvalidVoterReplicaVoteRequest() throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, true);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2112,12 +2131,11 @@ public class KafkaRaftClientTest {
@Test
public void testInvalidVoterReplicaBeginQuorumEpochRequest() throws
Exception {
- int localId = 0;
- int voter1 = localId;
+ int localId = randomReplicaId();
int voter2 = localId + 1;
int voter3 = localId + 2;
int epoch = 5;
- Set<Integer> voters = Utils.mkSet(voter1, voter2, voter3);
+ Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withUnknownLeader(epoch - 1)
@@ -2167,8 +2185,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testBeginQuorumEpochRequestClusterIdValidation(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2203,8 +2221,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testEndQuorumEpochRequestClusterIdValidation(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2239,8 +2257,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderAcceptVoteFromNonVoter(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2251,7 +2269,7 @@ public class KafkaRaftClientTest {
context.becomeLeader();
int epoch = context.currentEpoch();
- ReplicaKey nonVoterKey = replicaKey(2, withKip853Rpc);
+ ReplicaKey nonVoterKey = replicaKey(localId + 2, withKip853Rpc);
context.deliverRequest(context.voteRequest(epoch - 1, nonVoterKey, 0,
0));
context.client.poll();
context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch,
OptionalInt.of(localId), false);
@@ -2264,8 +2282,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testInvalidVoteRequest(boolean withKip853Rpc) throws Exception
{
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -2309,8 +2327,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testPurgatoryFetchTimeout(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2337,8 +2355,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testPurgatoryFetchSatisfiedByWrite(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2355,7 +2373,7 @@ public class KafkaRaftClientTest {
assertEquals(0, context.channel.drainSendQueue().size());
// Append some records that can fulfill the Fetch request
- String[] appendRecords = new String[] {"a", "b", "c"};
+ String[] appendRecords = new String[]{"a", "b", "c"};
context.client.scheduleAppend(epoch, Arrays.asList(appendRecords));
context.client.poll();
@@ -2366,11 +2384,10 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testPurgatoryFetchCompletedByFollowerTransition(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int voter1 = localId;
+ int localId = randomReplicaId();
ReplicaKey voterKey2 = replicaKey(localId + 1, withKip853Rpc);
int voter3 = localId + 2;
- Set<Integer> voters = Utils.mkSet(voter1, voterKey2.id(), voter3);
+ Set<Integer> voters = Utils.mkSet(localId, voterKey2.id(), voter3);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withUnknownLeader(4)
@@ -2403,8 +2420,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFetchResponseIgnoredAfterBecomingCandidate(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
// The other node starts out as the leader
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -2442,13 +2459,12 @@ public class KafkaRaftClientTest {
public void testFetchResponseIgnoredAfterBecomingFollowerOfDifferentLeader(
boolean withKip853Rpc
) throws Exception {
- int localId = 0;
- int voter1 = localId;
+ int localId = randomReplicaId();
int voter2 = localId + 1;
int voter3 = localId + 2;
int epoch = 5;
// Start out with `voter2` as the leader
- Set<Integer> voters = Utils.mkSet(voter1, voter2, voter3);
+ Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withElectedLeader(epoch, voter2)
@@ -2482,12 +2498,11 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testVoteResponseIgnoredAfterBecomingFollower(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int voter1 = localId;
+ int localId = randomReplicaId();
int voter2 = localId + 1;
int voter3 = localId + 2;
int epoch = 5;
- Set<Integer> voters = Utils.mkSet(voter1, voter2, voter3);
+ Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
.withUnknownLeader(epoch - 1)
@@ -2531,9 +2546,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void
testObserverLeaderRediscoveryAfterBrokerNotAvailableError(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int leaderId = 1;
- int otherNodeId = 2;
+ int localId = randomReplicaId();
+ int leaderId = localId + 1;
+ int otherNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters
@@ -2582,9 +2597,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testObserverLeaderRediscoveryAfterRequestTimeout(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int leaderId = 1;
- int otherNodeId = 2;
+ int localId = randomReplicaId();
+ int leaderId = localId + 1;
+ int otherNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters
@@ -2627,8 +2642,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderGracefulShutdown(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2674,9 +2689,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testEndQuorumEpochSentBasedOnFetchOffset(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey closeFollower = replicaKey(2, withKip853Rpc);
- ReplicaKey laggingFollower = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
+ ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(),
laggingFollower.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2719,7 +2734,7 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
+ int localId = randomReplicaId();
ReplicaKey voter2 = replicaKey(localId + 1, withKip853Rpc);
ReplicaKey voter3 = replicaKey(localId + 2, withKip853Rpc);
int epoch = 2;
@@ -2750,9 +2765,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testDescribeQuorum(boolean withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey closeFollower = replicaKey(2, withKip853Rpc);
- ReplicaKey laggingFollower = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
+ ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(),
laggingFollower.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2777,7 +2792,7 @@ public class KafkaRaftClientTest {
context.assertSentFetchPartitionResponse(3L, epoch);
// Create observer
- ReplicaKey observerId = replicaKey(3, withKip853Rpc);
+ ReplicaKey observerId = replicaKey(localId + 3, withKip853Rpc);
context.time.sleep(100);
long observerFetchTime = context.time.milliseconds();
context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0,
0));
@@ -2823,8 +2838,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderGracefulShutdownTimeout(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2861,8 +2876,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFollowerGracefulShutdown(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -2888,9 +2903,9 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testObserverGracefulShutdown(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int voter1 = 1;
- int voter2 = 2;
+ int localId = randomReplicaId();
+ int voter1 = localId + 1;
+ int voter2 = localId + 2;
Set<Integer> voters = Utils.mkSet(voter1, voter2);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -2915,7 +2930,7 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testGracefulShutdownSingleMemberQuorum(boolean withKip853Rpc)
throws IOException {
- int localId = 0;
+ int localId = randomReplicaId();
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, Collections.singleton(localId))
.withKip853Rpc(withKip853Rpc)
.build();
@@ -2933,8 +2948,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFollowerReplication(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -2963,8 +2978,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testEmptyRecordSetInFetchResponse(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3030,8 +3045,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFetchShouldBeTreatedAsLeaderAcknowledgement(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -3066,7 +3081,7 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderAppendSingleMemberQuorum(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
Set<Integer> voters = Collections.singleton(localId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -3080,7 +3095,7 @@ public class KafkaRaftClientTest {
// We still write the leader change message
assertEquals(OptionalLong.of(1L), context.client.highWatermark());
- String[] appendRecords = new String[] {"a", "b", "c"};
+ String[] appendRecords = new String[]{"a", "b", "c"};
// First poll has no high watermark advance
context.client.poll();
@@ -3093,7 +3108,7 @@ public class KafkaRaftClientTest {
assertEquals(OptionalLong.of(4L), context.client.highWatermark());
// Now try reading it
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
List<MutableRecordBatch> batches = new ArrayList<>(2);
boolean appended = true;
@@ -3142,8 +3157,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFollowerLogReconciliation(boolean withKip853Rpc) throws
Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
int lastEpoch = 3;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3179,7 +3194,7 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testMetrics(boolean withKip853Rpc) throws Exception {
- int localId = 0;
+ int localId = randomReplicaId();
int epoch = 1;
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, Collections.singleton(localId))
.withKip853Rpc(withKip853Rpc)
@@ -3226,8 +3241,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testClusterAuthorizationFailedInFetch(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3254,8 +3269,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3283,8 +3298,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testClusterAuthorizationFailedInVote(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3309,8 +3324,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testClusterAuthorizationFailedInEndQuorumEpoch(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -3337,8 +3352,8 @@ public class KafkaRaftClientTest {
public void
testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffsetOnEmptyLog(
boolean withKip853Rpc
) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -3384,8 +3399,8 @@ public class KafkaRaftClientTest {
public void
testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffset(
boolean withKip853Rpc
) throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -3451,8 +3466,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLateRegisteredListenerCatchesUp(boolean withKip853Rpc)
throws Exception {
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -3494,8 +3509,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testReregistrationChangesListenerContext(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3537,8 +3552,8 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void
testHandleCommitCallbackFiresAfterFollowerHighWatermarkAdvances(boolean
withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3598,9 +3613,8 @@ public class KafkaRaftClientTest {
public void testHandleCommitCallbackFiresInVotedState(boolean
withKip853Rpc) throws Exception {
// This test verifies that the state machine can still catch up even
while
// an election is in progress as long as the high watermark is known.
-
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int epoch = 7;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -3646,9 +3660,8 @@ public class KafkaRaftClientTest {
public void testHandleCommitCallbackFiresInCandidateState(boolean
withKip853Rpc) throws Exception {
// This test verifies that the state machine can still catch up even
while
// an election is in progress as long as the high watermark is known.
-
- int localId = 0;
- ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+ int localId = randomReplicaId();
+ ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
int epoch = 7;
Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
@@ -3705,9 +3718,8 @@ public class KafkaRaftClientTest {
// When registering a listener while the replica is unattached, it
should get notified
// with the current epoch
// When transitioning to follower, expect another notification with
the leader and epoch
-
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 7;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3741,9 +3753,8 @@ public class KafkaRaftClientTest {
public void testHandleLeaderChangeFiresAfterFollowerRegistration(boolean
withKip853Rpc) throws Exception {
// When registering a listener while the replica is a follower, it
should get notified with
// the current leader and epoch
-
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
int epoch = 7;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3768,8 +3779,8 @@ public class KafkaRaftClientTest {
public void testObserverFetchWithNoLocalId(boolean withKip853Rpc) throws
Exception {
// When no `localId` is defined, the client will behave as an observer.
// This is designed for tooling/debugging use cases.
-
- Set<Integer> voters = Utils.mkSet(1, 2);
+ int leaderId = randomReplicaId();
+ Set<Integer> voters = Utils.mkSet(leaderId, leaderId + 1);
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
@@ -3788,7 +3799,6 @@ public class KafkaRaftClientTest {
context.assertFetchRequestData(fetchRequest1, 0, 0L, 0);
int leaderEpoch = 5;
- int leaderId = 1;
context.deliverResponse(
fetchRequest1.correlationId(),
@@ -3822,10 +3832,10 @@ public class KafkaRaftClientTest {
}
@ParameterizedTest
- @CsvSource({"false,false", "false,true", "true,false", "true,true"})
+ @CsvSource({ "false,false", "false,true", "true,false", "true,true" })
public void testAppendWithRequiredBaseOffset(boolean correctOffset,
boolean withKip853Rpc) throws Exception {
- int localId = 0;
- int otherNodeId = 1;
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
@@ -3854,4 +3864,8 @@ public class KafkaRaftClientTest {
Uuid directoryId = withDirectoryId ? Uuid.randomUuid() :
ReplicaKey.NO_DIRECTORY_ID;
return ReplicaKey.of(id, directoryId);
}
+
+ private static int randomReplicaId() {
+ return ThreadLocalRandom.current().nextInt(1025);
+ }
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 31d696484f3..600130b41ab 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -71,6 +71,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -657,6 +658,18 @@ public final class RaftClientTestContext {
partitionData,
nodes
);
+
+ List<ReplicaState> sortedVoters = response
+ .topics()
+ .get(0)
+ .partitions()
+ .get(0)
+ .currentVoters()
+ .stream()
+ .sorted(Comparator.comparingInt(ReplicaState::replicaId))
+ .collect(Collectors.toList());
+
response.topics().get(0).partitions().get(0).setCurrentVoters(sortedVoters);
+
assertEquals(expectedResponse, response);
}
@@ -691,10 +704,12 @@ public final class RaftClientTestContext {
VoteResponseData.PartitionData partitionResponse =
response.topics().get(0).partitions().get(0);
- assertEquals(voteGranted, partitionResponse.voteGranted());
- assertEquals(error, Errors.forCode(partitionResponse.errorCode()));
- assertEquals(epoch, partitionResponse.leaderEpoch());
+ String voterIdDebugLog = "Leader Id: " + leaderId +
+ " Partition response leader Id: " + partitionResponse.leaderId();
+ assertEquals(voteGranted, partitionResponse.voteGranted(),
voterIdDebugLog);
+ assertEquals(error, Errors.forCode(partitionResponse.errorCode()),
voterIdDebugLog);
assertEquals(leaderId.orElse(-1), partitionResponse.leaderId());
+ assertEquals(epoch, partitionResponse.leaderEpoch());
if (kip853Rpc && leaderId.isPresent()) {
Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId.getAsInt());
@@ -795,7 +810,7 @@ public final class RaftClientTestContext {
}
void assertSentBeginQuorumEpochResponse(
- Errors responseError
+ Errors responseError
) {
List<RaftResponse.Outbound> sentMessages =
drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH);
assertEquals(1, sentMessages.size());
@@ -840,7 +855,12 @@ public final class RaftClientTestContext {
assertEquals(epoch, partitionResponse.leaderEpoch());
assertEquals(leaderId.orElse(-1), partitionResponse.leaderId());
- assertEquals(partitionError,
Errors.forCode(partitionResponse.errorCode()));
+ assertEquals(
+ partitionError,
+ Errors.forCode(partitionResponse.errorCode()),
+ "Leader Id: " + leaderId +
+ " Partition response leader Id: " + partitionResponse.leaderId()
+ );
if (kip853Rpc && leaderId.isPresent()) {
Endpoints expectedLeaderEndpoints =
startingVoters.listeners(leaderId.getAsInt());
@@ -1318,7 +1338,11 @@ public final class RaftClientTestContext {
.stream()
.map(voterId -> new Voter().setVoterId(voterId))
.collect(Collectors.toList()),
- leaderChangeMessage.voters()
+ leaderChangeMessage
+ .voters()
+ .stream()
+ .sorted(Comparator.comparingInt(Voter::voterId))
+ .collect(Collectors.toList())
);
assertEquals(
grantingVoters