This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2cf87bff9b6 KAFKA-16953; Properly implement the sending of
DescribeQuorumResponse (#16637)
2cf87bff9b6 is described below
commit 2cf87bff9b6b5136a22539edf48b2d7cc668bdf9
Author: Alyssa Huang <[email protected]>
AuthorDate: Mon Jul 29 11:36:17 2024 -0700
KAFKA-16953; Properly implement the sending of DescribeQuorumResponse
(#16637)
This change allows the KRaft leader to send the DescribeQuorumResponse
version based on the schema version used by the client.
Reviewers: José Armando García Sancio <[email protected]>
---
.../main/java/org/apache/kafka/raft/Endpoints.java | 14 +
.../org/apache/kafka/raft/KafkaRaftClient.java | 8 +-
.../java/org/apache/kafka/raft/LeaderState.java | 112 ++---
.../main/java/org/apache/kafka/raft/RaftUtil.java | 73 ++-
.../org/apache/kafka/raft/internals/VoterSet.java | 5 +-
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 427 ++++++++++++++--
.../org/apache/kafka/raft/LeaderStateTest.java | 543 ---------------------
.../apache/kafka/raft/RaftClientTestContext.java | 29 +-
8 files changed, 542 insertions(+), 669 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
index 41292252ae6..5e979022ec3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
+++ b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java
@@ -19,6 +19,7 @@ package org.apache.kafka.raft;
import org.apache.kafka.common.message.AddRaftVoterRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
+import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchResponseData;
@@ -113,7 +114,20 @@ public final class Endpoints {
.setPort(entry.getValue().getPort())
);
}
+ return listeners;
+ }
+ public DescribeQuorumResponseData.ListenerCollection
toDescribeQuorumResponseListeners() {
+ DescribeQuorumResponseData.ListenerCollection listeners =
+ new
DescribeQuorumResponseData.ListenerCollection(endpoints.size());
+ for (Map.Entry<ListenerName, InetSocketAddress> entry :
endpoints.entrySet()) {
+ listeners.add(
+ new DescribeQuorumResponseData.Listener()
+ .setName(entry.getKey().value())
+ .setHost(entry.getValue().getHostString())
+ .setPort(entry.getValue().getPort())
+ );
+ }
return listeners;
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 88de2e94d21..0b454666db3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1706,8 +1706,12 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
return RaftUtil.singletonDescribeQuorumResponse(
requestMetadata.apiVersion(),
log.topicPartition(),
- leaderState.describeQuorum(currentTimeMs),
- leaderState.nodes(currentTimeMs)
+ quorum.localIdOrThrow(),
+ leaderState.epoch(),
+
leaderState.highWatermark().map(LogOffsetMetadata::offset).orElse(-1L),
+ leaderState.voterStates().values(),
+ leaderState.observerStates(currentTimeMs).values(),
+ currentTimeMs
);
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index e07a2ad04cb..06deb28c69c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.raft;
-import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.LeaderChangeMessage.Voter;
@@ -38,7 +37,6 @@ import org.apache.kafka.server.common.KRaftVersion;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -113,7 +111,7 @@ public class LeaderState<T> implements EpochState {
boolean hasAcknowledgedLeader = voterNode.isVoter(localReplicaKey);
this.voterStates.put(
voterNode.voterKey().id(),
- new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader)
+ new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader,
voterNode.listeners())
);
}
this.grantingVoters = Collections.unmodifiableSet(new
HashSet<>(grantingVoters));
@@ -395,6 +393,15 @@ public class LeaderState<T> implements EpochState {
return endpoints;
}
+ Map<Integer, ReplicaState> voterStates() {
+ return voterStates;
+ }
+
+ Map<ReplicaKey, ReplicaState> observerStates(final long currentTimeMs) {
+ clearInactiveObservers(currentTimeMs);
+ return observerStates;
+ }
+
public Set<Integer> grantingVoters() {
return this.grantingVoters;
}
@@ -599,7 +606,7 @@ public class LeaderState<T> implements EpochState {
private ReplicaState getOrCreateReplicaState(ReplicaKey replicaKey) {
ReplicaState state = voterStates.get(replicaKey.id());
if (state == null || !state.matchesKey(replicaKey)) {
- observerStates.putIfAbsent(replicaKey, new
ReplicaState(replicaKey, false));
+ observerStates.putIfAbsent(replicaKey, new
ReplicaState(replicaKey, false, Endpoints.empty()));
return observerStates.get(replicaKey);
}
return state;
@@ -614,58 +621,6 @@ public class LeaderState<T> implements EpochState {
return Optional.ofNullable(state);
}
- public DescribeQuorumResponseData.PartitionData describeQuorum(long
currentTimeMs) {
- clearInactiveObservers(currentTimeMs);
-
- return new DescribeQuorumResponseData.PartitionData()
- .setErrorCode(Errors.NONE.code())
- .setLeaderId(localReplicaKey.id())
- .setLeaderEpoch(epoch)
-
.setHighWatermark(highWatermark.map(LogOffsetMetadata::offset).orElse(-1L))
- .setCurrentVoters(describeReplicaStates(voterStates.values(),
currentTimeMs))
- .setObservers(describeReplicaStates(observerStates.values(),
currentTimeMs));
- }
-
- public DescribeQuorumResponseData.NodeCollection nodes(long currentTimeMs)
{
- clearInactiveObservers(currentTimeMs);
-
- // KAFKA-16953 will add support for including the node listeners in
the node collection
- return new DescribeQuorumResponseData.NodeCollection();
- }
-
- private List<DescribeQuorumResponseData.ReplicaState>
describeReplicaStates(
- Collection<ReplicaState> states,
- long currentTimeMs
- ) {
- return states
- .stream()
- .map(replicaState -> describeReplicaState(replicaState,
currentTimeMs))
- .collect(Collectors.toList());
- }
-
- private DescribeQuorumResponseData.ReplicaState describeReplicaState(
- ReplicaState replicaState,
- long currentTimeMs
- ) {
- final long lastCaughtUpTimestamp;
- final long lastFetchTimestamp;
- if (replicaState.matchesKey(localReplicaKey)) {
- lastCaughtUpTimestamp = currentTimeMs;
- lastFetchTimestamp = currentTimeMs;
- } else {
- lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp;
- lastFetchTimestamp = replicaState.lastFetchTimestamp;
- }
-
- // KAFKA-16953 will add support for the replica directory id
- return new DescribeQuorumResponseData.ReplicaState()
- .setReplicaId(replicaState.replicaKey.id())
-
.setLogEndOffset(replicaState.endOffset.map(LogOffsetMetadata::offset).orElse(-1L))
- .setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
- .setLastFetchTimestamp(lastFetchTimestamp);
-
- }
-
/**
* Clear observer states that have not been active for a while and are not
the leader.
*/
@@ -688,7 +643,7 @@ public class LeaderState<T> implements EpochState {
// Compute the new voter states map
for (VoterSet.VoterNode voterNode : lastVoterSet.voterNodes()) {
ReplicaState state = getReplicaState(voterNode.voterKey())
- .orElse(new ReplicaState(voterNode.voterKey(), false));
+ .orElse(new ReplicaState(voterNode.voterKey(), false,
voterNode.listeners()));
// Remove the voter from the previous data structures
oldVoterStates.remove(voterNode.voterKey().id());
@@ -702,20 +657,23 @@ public class LeaderState<T> implements EpochState {
// Move any of the remaining old voters to observerStates
for (ReplicaState replicaStateEntry : oldVoterStates.values()) {
+ replicaStateEntry.clearListeners();
observerStates.putIfAbsent(replicaStateEntry.replicaKey,
replicaStateEntry);
}
}
- private static class ReplicaState implements Comparable<ReplicaState> {
- ReplicaKey replicaKey;
- Optional<LogOffsetMetadata> endOffset;
- long lastFetchTimestamp;
- long lastFetchLeaderLogEndOffset;
- long lastCaughtUpTimestamp;
- boolean hasAcknowledgedLeader;
+ static class ReplicaState implements Comparable<ReplicaState> {
+ private ReplicaKey replicaKey;
+ private Endpoints listeners;
+ private Optional<LogOffsetMetadata> endOffset;
+ private long lastFetchTimestamp;
+ private long lastFetchLeaderLogEndOffset;
+ private long lastCaughtUpTimestamp;
+ private boolean hasAcknowledgedLeader;
- public ReplicaState(ReplicaKey replicaKey, boolean
hasAcknowledgedLeader) {
+ public ReplicaState(ReplicaKey replicaKey, boolean
hasAcknowledgedLeader, Endpoints listeners) {
this.replicaKey = replicaKey;
+ this.listeners = listeners;
this.endOffset = Optional.empty();
this.lastFetchTimestamp = -1;
this.lastFetchLeaderLogEndOffset = -1;
@@ -723,6 +681,26 @@ public class LeaderState<T> implements EpochState {
this.hasAcknowledgedLeader = hasAcknowledgedLeader;
}
+ public ReplicaKey replicaKey() {
+ return replicaKey;
+ }
+
+ public Endpoints listeners() {
+ return listeners;
+ }
+
+ public Optional<LogOffsetMetadata> endOffset() {
+ return endOffset;
+ }
+
+ public long lastFetchTimestamp() {
+ return lastFetchTimestamp;
+ }
+
+ public long lastCaughtUpTimestamp() {
+ return lastCaughtUpTimestamp;
+ }
+
void setReplicaKey(ReplicaKey replicaKey) {
if (this.replicaKey.id() != replicaKey.id()) {
throw new IllegalArgumentException(
@@ -747,6 +725,10 @@ public class LeaderState<T> implements EpochState {
this.replicaKey = replicaKey;
}
+ void clearListeners() {
+ this.listeners = Endpoints.empty();
+ }
+
boolean matchesKey(ReplicaKey replicaKey) {
if (this.replicaKey.id() != replicaKey.id()) return false;
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index e5c36e13703..4d2f3bc06e8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.raft.internals.ReplicaKey;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -481,8 +482,12 @@ public class RaftUtil {
public static DescribeQuorumResponseData singletonDescribeQuorumResponse(
short apiVersion,
TopicPartition topicPartition,
- DescribeQuorumResponseData.PartitionData partitionData,
- DescribeQuorumResponseData.NodeCollection nodes
+ int leaderId,
+ int leaderEpoch,
+ long highWatermark,
+ Collection<LeaderState.ReplicaState> voters,
+ Collection<LeaderState.ReplicaState> observers,
+ long currentTimeMs
) {
DescribeQuorumResponseData response = new DescribeQuorumResponseData()
.setTopics(
@@ -491,16 +496,25 @@ public class RaftUtil {
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
-
partitionData.setPartitionIndex(topicPartition.partition())
- )
- )
- )
- );
-
+ new DescribeQuorumResponseData.PartitionData()
+
.setPartitionIndex(topicPartition.partition())
+ .setErrorCode(Errors.NONE.code())
+ .setLeaderId(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setHighWatermark(highWatermark)
+
.setCurrentVoters(toReplicaStates(apiVersion, leaderId, voters, currentTimeMs))
+ .setObservers(toReplicaStates(apiVersion,
leaderId, observers, currentTimeMs))))));
if (apiVersion >= 2) {
+ DescribeQuorumResponseData.NodeCollection nodes = new
DescribeQuorumResponseData.NodeCollection(voters.size());
+ for (LeaderState.ReplicaState voter : voters) {
+ nodes.add(
+ new DescribeQuorumResponseData.Node()
+ .setNodeId(voter.replicaKey().id())
+
.setListeners(voter.listeners().toDescribeQuorumResponseListeners())
+ );
+ }
response.setNodes(nodes);
}
-
return response;
}
@@ -528,7 +542,7 @@ public class RaftUtil {
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}
-
+
public static RemoveRaftVoterRequestData removeVoterRequest(
String clusterId,
ReplicaKey voter
@@ -550,6 +564,45 @@ public class RaftUtil {
.setErrorMessage(errorMessage);
}
+ private static List<DescribeQuorumResponseData.ReplicaState>
toReplicaStates(
+ short apiVersion,
+ int leaderId,
+ Collection<LeaderState.ReplicaState> states,
+ long currentTimeMs
+ ) {
+ return states
+ .stream()
+ .map(replicaState -> toReplicaState(apiVersion, leaderId,
replicaState, currentTimeMs))
+ .collect(Collectors.toList());
+ }
+
+ private static DescribeQuorumResponseData.ReplicaState toReplicaState(
+ short apiVersion,
+ int leaderId,
+ LeaderState.ReplicaState replicaState,
+ long currentTimeMs
+ ) {
+ final long lastCaughtUpTimestamp;
+ final long lastFetchTimestamp;
+ if (replicaState.replicaKey().id() == leaderId) {
+ lastCaughtUpTimestamp = currentTimeMs;
+ lastFetchTimestamp = currentTimeMs;
+ } else {
+ lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp();
+ lastFetchTimestamp = replicaState.lastFetchTimestamp();
+ }
+ DescribeQuorumResponseData.ReplicaState replicaStateData = new
DescribeQuorumResponseData.ReplicaState()
+ .setReplicaId(replicaState.replicaKey().id())
+
.setLogEndOffset(replicaState.endOffset().map(LogOffsetMetadata::offset).orElse(-1L))
+ .setLastCaughtUpTimestamp(lastCaughtUpTimestamp)
+ .setLastFetchTimestamp(lastFetchTimestamp);
+
+ if (apiVersion >= 2) {
+
replicaStateData.setReplicaDirectoryId(replicaState.replicaKey().directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID));
+ }
+ return replicaStateData;
+ }
+
public static Optional<ReplicaKey> voteRequestVoterKey(
VoteRequestData request,
VoteRequestData.PartitionData partition
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index e665794c2af..447b44a93d9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -325,7 +325,10 @@ public final class VoterSet {
}
}
- Endpoints listeners() {
+ /**
+ * Returns the listeners of the voter node
+ */
+ public Endpoints listeners() {
return listeners;
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 8f78f29c7ba..419618c00ff 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -42,6 +42,8 @@ import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.errors.UnexpectedBaseOffsetException;
import org.apache.kafka.raft.internals.ReplicaKey;
+import org.apache.kafka.raft.internals.VoterSet;
+import org.apache.kafka.raft.internals.VoterSetTest;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -2765,77 +2767,412 @@ public class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
- public void testDescribeQuorum(boolean withKip853Rpc) throws Exception {
+ public void testDescribeQuorumWithOnlyStaticVoters(boolean withKip853Rpc)
throws Exception {
int localId = randomReplicaId();
- ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
- ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
- Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(),
laggingFollower.id());
+ ReplicaKey local = replicaKey(localId, true);
+ ReplicaKey follower1 = replicaKey(localId + 1, true);
+ Set<Integer> voters = Utils.mkSet(localId, follower1.id());
- RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, local.directoryId().get())
+ .withStaticVoters(voters)
.withKip853Rpc(withKip853Rpc)
.build();
context.becomeLeader();
int epoch = context.currentEpoch();
- long laggingFollowerFetchTime = context.time.milliseconds();
- context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L,
epoch, 0));
+ // Describe quorum response will not include directory ids
+ context.deliverRequest(context.describeQuorumRequest());
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(1L, epoch);
+ List<ReplicaState> expectedVoterStates = Arrays.asList(
+ new ReplicaState()
+ .setReplicaId(localId)
+ .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(1L)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds()),
+ new ReplicaState()
+ .setReplicaId(follower1.id())
+ .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(-1L)
+ .setLastFetchTimestamp(-1)
+ .setLastCaughtUpTimestamp(-1));
+ context.assertSentDescribeQuorumResponse(localId, epoch, -1L,
expectedVoterStates, Collections.emptyList());
+ }
- context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
+ @ParameterizedTest
+ @CsvSource({ "true, true", "true, false", "false, false" })
+ public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean
withBootstrapSnapshot) throws Exception {
+ int localId = randomReplicaId();
+ int followerId1 = localId + 1;
+ int followerId2 = localId + 2;
+ ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+ // local directory id must exist
+ Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+ ReplicaKey bootstrapFollower1 = replicaKey(followerId1,
withBootstrapSnapshot);
+ // if withBootstrapSnapshot is false, directory ids are still needed
by the static voter set
+ Uuid followerDirectoryId1 =
bootstrapFollower1.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() :
ReplicaKey.NO_DIRECTORY_ID);
+ ReplicaKey follower1 = ReplicaKey.of(followerId1,
followerDirectoryId1);
+ ReplicaKey bootstrapFollower2 = replicaKey(followerId2,
withBootstrapSnapshot);
+ Uuid followerDirectoryId2 =
bootstrapFollower2.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() :
ReplicaKey.NO_DIRECTORY_ID);
+ ReplicaKey follower2 = ReplicaKey.of(followerId2,
followerDirectoryId2);
+
+ RaftClientTestContext.Builder builder = new
RaftClientTestContext.Builder(localId, localDirectoryId)
+ .withKip853Rpc(withKip853Rpc);
+
+ if (withBootstrapSnapshot) {
+ VoterSet bootstrapVoterSet =
VoterSetTest.voterSet(Stream.of(local, bootstrapFollower1, bootstrapFollower2));
+ builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+ } else {
+ VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local,
follower1, follower2));
+ builder.withStaticVoters(staticVoterSet);
+ }
+ RaftClientTestContext context = builder.build();
+
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+
+ // Describe quorum response before any fetches made
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+ List<ReplicaState> expectedVoterStates = Arrays.asList(
+ new ReplicaState()
+ .setReplicaId(localId)
+ .setReplicaDirectoryId(withBootstrapSnapshot ?
context.localReplicaKey().directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(withBootstrapSnapshot ? 3L : 1L)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds()),
+ new ReplicaState()
+ .setReplicaId(followerId1)
+ .setReplicaDirectoryId(withBootstrapSnapshot ?
follower1.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(-1L)
+ .setLastFetchTimestamp(-1)
+ .setLastCaughtUpTimestamp(-1),
+ new ReplicaState()
+ .setReplicaId(followerId2)
+ .setReplicaDirectoryId(withBootstrapSnapshot ?
follower2.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(-1L)
+ .setLastFetchTimestamp(-1)
+ .setLastCaughtUpTimestamp(-1));
+ context.assertSentDescribeQuorumResponse(localId, epoch, -1L,
expectedVoterStates, Collections.emptyList());
+
+ context.time.sleep(100);
+ long fetchOffset = withBootstrapSnapshot ? 3L : 1L;
+ long followerFetchTime1 = context.time.milliseconds();
+ context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset,
epoch, 0));
+ context.pollUntilResponse();
+ long expectedHW = fetchOffset;
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+ List<String> records = Arrays.asList("foo", "bar");
+ long nextFetchOffset = fetchOffset + records.size();
+ context.client.scheduleAppend(epoch, records);
context.client.poll();
context.time.sleep(100);
- long closeFollowerFetchTime = context.time.milliseconds();
- context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L,
epoch, 0));
+ context.deliverRequest(context.describeQuorumRequest());
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(3L, epoch);
+
+ expectedVoterStates.get(0)
+ .setLogEndOffset(nextFetchOffset)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds());
+ expectedVoterStates.get(1)
+ .setLogEndOffset(fetchOffset)
+ .setLastFetchTimestamp(followerFetchTime1)
+ .setLastCaughtUpTimestamp(followerFetchTime1);
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, Collections.emptyList());
+
+ // After follower2 catches up to leader
+ context.time.sleep(100);
+ long followerFetchTime2 = context.time.milliseconds();
+ context.deliverRequest(context.fetchRequest(epoch, follower2,
nextFetchOffset, epoch, 0));
+ context.pollUntilResponse();
+ expectedHW = nextFetchOffset;
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+ context.time.sleep(100);
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+
+ expectedVoterStates.get(0)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds());
+ expectedVoterStates.get(2)
+ .setLogEndOffset(nextFetchOffset)
+ .setLastFetchTimestamp(followerFetchTime2)
+ .setLastCaughtUpTimestamp(followerFetchTime2);
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, Collections.emptyList());
+
+ // Describe quorum returns error if leader loses leadership
+ context.time.sleep(context.checkQuorumTimeoutMs);
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+
context.assertSentDescribeQuorumResponse(Errors.NOT_LEADER_OR_FOLLOWER, 0, 0,
0, Collections.emptyList(), Collections.emptyList());
+ }
+
+ @ParameterizedTest
+ @CsvSource({ "true, true", "true, false", "false, false" })
+ public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean
withBootstrapSnapshot) throws Exception {
+ int localId = randomReplicaId();
+ int followerId = localId + 1;
+ ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+ Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+ ReplicaKey bootstrapFollower = replicaKey(followerId,
withBootstrapSnapshot);
+ Uuid followerDirectoryId =
bootstrapFollower.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() :
ReplicaKey.NO_DIRECTORY_ID);
+ ReplicaKey follower = ReplicaKey.of(followerId, followerDirectoryId);
+
+ RaftClientTestContext.Builder builder = new
RaftClientTestContext.Builder(localId, localDirectoryId)
+ .withKip853Rpc(withKip853Rpc);
+
+ if (withBootstrapSnapshot) {
+ VoterSet bootstrapVoterSet =
VoterSetTest.voterSet(Stream.of(local, bootstrapFollower));
+ builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+ } else {
+ VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local,
follower));
+ builder.withStaticVoters(staticVoterSet);
+ }
+ RaftClientTestContext context = builder.build();
+
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+
+ // Update HW to non-initial value
+ context.time.sleep(100);
+ long fetchOffset = withBootstrapSnapshot ? 3L : 1L;
+ long followerFetchTime = context.time.milliseconds();
+ context.deliverRequest(context.fetchRequest(1, follower, fetchOffset,
epoch, 0));
+ context.pollUntilResponse();
+ long expectedHW = fetchOffset;
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
// Create observer
- ReplicaKey observerId = replicaKey(localId + 3, withKip853Rpc);
+ ReplicaKey observer = replicaKey(localId + 2, withKip853Rpc);
+ Uuid observerDirectoryId =
observer.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID);
context.time.sleep(100);
long observerFetchTime = context.time.milliseconds();
- context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0,
0));
+ context.deliverRequest(context.fetchRequest(epoch, observer, 0L, 0,
0));
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(3L, epoch);
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+ context.time.sleep(100);
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+
+ List<ReplicaState> expectedVoterStates = Arrays.asList(
+ new ReplicaState()
+ .setReplicaId(localId)
+ .setReplicaDirectoryId(withBootstrapSnapshot ?
localDirectoryId : ReplicaKey.NO_DIRECTORY_ID)
+ // As we are appending the records directly to the log,
+ // the leader end offset hasn't been updated yet.
+ .setLogEndOffset(fetchOffset)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds()),
+ new ReplicaState()
+ .setReplicaId(follower.id())
+ .setReplicaDirectoryId(withBootstrapSnapshot ?
followerDirectoryId : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(fetchOffset)
+ .setLastFetchTimestamp(followerFetchTime)
+ .setLastCaughtUpTimestamp(followerFetchTime));
+ List<ReplicaState> expectedObserverStates = singletonList(
+ new ReplicaState()
+ .setReplicaId(observer.id())
+ .setReplicaDirectoryId(observerDirectoryId)
+ .setLogEndOffset(0L)
+ .setLastFetchTimestamp(observerFetchTime)
+ .setLastCaughtUpTimestamp(-1L));
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, expectedObserverStates);
+
+ // Update observer fetch state
+ context.time.sleep(100);
+ observerFetchTime = context.time.milliseconds();
+ context.deliverRequest(context.fetchRequest(epoch, observer,
fetchOffset, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+ context.time.sleep(100);
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+
+ expectedVoterStates.get(0)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds());
+ expectedObserverStates.get(0)
+ .setLogEndOffset(fetchOffset)
+ .setLastFetchTimestamp(observerFetchTime)
+ .setLastCaughtUpTimestamp(observerFetchTime);
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, expectedObserverStates);
+
+ // Observer falls behind
+ context.time.sleep(100);
+ List<String> records = Arrays.asList("foo", "bar");
+ context.client.scheduleAppend(epoch, records);
+ context.client.poll();
+
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+
+ expectedVoterStates.get(0)
+ .setLogEndOffset(fetchOffset + records.size())
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds());
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, expectedObserverStates);
+
+ // Observer is removed due to inactivity
+ long timeToSleep = LeaderState.OBSERVER_SESSION_TIMEOUT_MS;
+ while (timeToSleep > 0) {
+ // Follower needs to continue polling to keep leader alive
+ followerFetchTime = context.time.milliseconds();
+ context.deliverRequest(context.fetchRequest(epoch, follower,
fetchOffset, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+ context.time.sleep(context.checkQuorumTimeoutMs - 1);
+ timeToSleep = timeToSleep - (context.checkQuorumTimeoutMs - 1);
+ }
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+
+ expectedVoterStates.get(0)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds());
+ expectedVoterStates.get(1)
+ .setLastFetchTimestamp(followerFetchTime);
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, Collections.emptyList());
+
+ // No-op for negative node id
+ context.deliverRequest(context.fetchRequest(epoch, ReplicaKey.of(-1,
ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+
+ expectedVoterStates.get(0)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds());
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, Collections.emptyList());
+ }
+
+ @ParameterizedTest
+ @CsvSource({ "true, true", "true, false", "false, false" })
+ public void testDescribeQuorumNonMonotonicFollowerFetch(boolean
withKip853Rpc, boolean withBootstrapSnapshot) throws Exception {
+ int localId = randomReplicaId();
+ ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+ Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+ int followerId = localId + 1;
+ ReplicaKey bootstrapFollower = replicaKey(followerId,
withBootstrapSnapshot);
+ Uuid followerDirectoryId =
bootstrapFollower.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() :
ReplicaKey.NO_DIRECTORY_ID);
+ ReplicaKey follower = ReplicaKey.of(followerId, followerDirectoryId);
+
+ RaftClientTestContext.Builder builder = new
RaftClientTestContext.Builder(localId, localDirectoryId)
+ .withKip853Rpc(withKip853Rpc);
+ if (withBootstrapSnapshot) {
+ VoterSet bootstrapVoterSet =
VoterSetTest.voterSet(Stream.of(local, bootstrapFollower));
+ builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+ } else {
+ VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local,
follower));
+ builder.withStaticVoters(staticVoterSet);
+ }
+ RaftClientTestContext context = builder.build();
+
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+
+ // Update HW to non-initial value
+ context.time.sleep(100);
+ List<String> batch = Arrays.asList("foo", "bar");
+ context.client.scheduleAppend(epoch, batch);
+ context.client.poll();
+ long fetchOffset = withBootstrapSnapshot ? 5L : 3L;
+ long followerFetchTime = context.time.milliseconds();
+ context.deliverRequest(context.fetchRequest(epoch, follower,
fetchOffset, epoch, 0));
+ context.pollUntilResponse();
+ long expectedHW = fetchOffset;
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
context.time.sleep(100);
context.deliverRequest(context.describeQuorumRequest());
context.pollUntilResponse();
+ List<ReplicaState> expectedVoterStates = Arrays.asList(
+ new ReplicaState()
+ .setReplicaId(localId)
+ .setReplicaDirectoryId(withBootstrapSnapshot ?
local.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(fetchOffset)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds()),
+ new ReplicaState()
+ .setReplicaId(follower.id())
+ .setReplicaDirectoryId(withBootstrapSnapshot ?
follower.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(fetchOffset)
+ .setLastFetchTimestamp(followerFetchTime)
+ .setLastCaughtUpTimestamp(followerFetchTime));
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, Collections.emptyList());
+
+ // Follower crashes and disk is lost. It fetches an earlier offset to
rebuild state.
+ // The leader will report an error in the logs, but will not let the
high watermark rewind
+ context.time.sleep(100);
+ followerFetchTime = context.time.milliseconds();
+ context.deliverRequest(context.fetchRequest(epoch, follower,
fetchOffset - 1, epoch, 0));
+ context.pollUntilResponse();
+ context.assertSentFetchPartitionResponse(expectedHW, epoch);
+ context.time.sleep(100);
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+
+ expectedVoterStates.get(0)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds());
+ expectedVoterStates.get(1)
+ .setLogEndOffset(fetchOffset - batch.size())
+ .setLastFetchTimestamp(followerFetchTime);
+ context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW,
expectedVoterStates, Collections.emptyList());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void testStaticVotersIgnoredWithBootstrapSnapshot(boolean
withKip853Rpc) throws Exception {
+ int localId = randomReplicaId();
+ ReplicaKey local = replicaKey(localId, true);
+ ReplicaKey follower = replicaKey(localId + 1, true);
+ ReplicaKey follower2 = replicaKey(localId + 2, true);
+ // only include one follower in static voter set
+ Set<Integer> staticVoters = Utils.mkSet(localId, follower.id());
+ VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower,
follower2));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, local.directoryId().get())
+ .withStaticVoters(staticVoters)
+ .withKip853Rpc(withKip853Rpc)
+ .withBootstrapSnapshot(Optional.of(voterSet))
+ .build();
- // KAFKA-16953 will add support for including the directory id
- context.assertSentDescribeQuorumResponse(localId, epoch, 3L,
- Arrays.asList(
- new ReplicaState()
- .setReplicaId(localId)
- .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
- // As we are appending the records directly to the log,
- // the leader end offset hasn't been updated yet.
- .setLogEndOffset(3L)
- .setLastFetchTimestamp(context.time.milliseconds())
- .setLastCaughtUpTimestamp(context.time.milliseconds()),
- new ReplicaState()
- .setReplicaId(laggingFollower.id())
- .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
- .setLogEndOffset(1L)
- .setLastFetchTimestamp(laggingFollowerFetchTime)
- .setLastCaughtUpTimestamp(laggingFollowerFetchTime),
- new ReplicaState()
- .setReplicaId(closeFollower.id())
- .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
- .setLogEndOffset(3L)
- .setLastFetchTimestamp(closeFollowerFetchTime)
- .setLastCaughtUpTimestamp(closeFollowerFetchTime)),
- singletonList(
- new ReplicaState()
- .setReplicaId(observerId.id())
- .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
- .setLogEndOffset(0L)
- .setLastFetchTimestamp(observerFetchTime)
- .setLastCaughtUpTimestamp(-1L)));
+ context.becomeLeader();
+ int epoch = context.currentEpoch();
+ // check describe quorum response has both followers
+ context.deliverRequest(context.describeQuorumRequest());
+ context.pollUntilResponse();
+ List<ReplicaState> expectedVoterStates = Arrays.asList(
+ new ReplicaState()
+ .setReplicaId(localId)
+ .setReplicaDirectoryId(withKip853Rpc ?
local.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(3L)
+ .setLastFetchTimestamp(context.time.milliseconds())
+ .setLastCaughtUpTimestamp(context.time.milliseconds()),
+ new ReplicaState()
+ .setReplicaId(follower.id())
+ .setReplicaDirectoryId(withKip853Rpc ?
follower.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(-1L)
+ .setLastFetchTimestamp(-1)
+ .setLastCaughtUpTimestamp(-1),
+ new ReplicaState()
+ .setReplicaId(follower2.id())
+ .setReplicaDirectoryId(withKip853Rpc ?
follower2.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+ .setLogEndOffset(-1L)
+ .setLastFetchTimestamp(-1)
+ .setLastCaughtUpTimestamp(-1));
+ context.assertSentDescribeQuorumResponse(localId, epoch, -1L,
expectedVoterStates, Collections.emptyList());
}
+
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderGracefulShutdownTimeout(boolean withKip853Rpc)
throws Exception {
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index dfb5469434d..8ce18ec9d44 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.raft;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.raft.internals.BatchAccumulator;
@@ -33,7 +32,6 @@ import org.mockito.Mockito;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -183,164 +181,6 @@ public class LeaderStateTest {
);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testLastCaughtUpTimeVoters(boolean withDirectoryId) {
- ReplicaKey nodeKey1 = replicaKey(1, withDirectoryId);
- ReplicaKey nodeKey2 = replicaKey(2, withDirectoryId);
- int currentTime = 1000;
- int fetchTime = 0;
- int caughtUpTime = -1;
- VoterSet voters = localWithRemoteVoterSet(Stream.of(nodeKey1,
nodeKey2), withDirectoryId);
- LeaderState<?> state = newLeaderState(voters, 10L);
-
- assertEquals(Optional.empty(), state.highWatermark());
- assertFalse(state.updateLocalState(new LogOffsetMetadata(10L),
voters));
- assertEquals(mkSet(nodeKey1, nodeKey2),
state.nonAcknowledgingVoters());
- assertEquals(Optional.empty(), state.highWatermark());
-
- // Node 1 falls behind
- assertFalse(state.updateLocalState(new LogOffsetMetadata(11L),
voters));
- assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(10L)));
- assertEquals(
- currentTime,
- describeVoterState(state, localReplicaKey.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertEquals(
- caughtUpTime,
- describeVoterState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
-
- // Node 1 catches up to leader
- assertTrue(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(11L)));
- caughtUpTime = fetchTime;
- assertEquals(
- currentTime,
- describeVoterState(state, localReplicaKey.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertEquals(
- caughtUpTime,
- describeVoterState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
-
- // Node 1 falls behind
- assertFalse(state.updateLocalState(new LogOffsetMetadata(100L),
voters));
- assertTrue(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(50L)));
- assertEquals(currentTime, describeVoterState(state,
localReplicaKey.id(), currentTime).lastCaughtUpTimestamp());
- assertEquals(
- caughtUpTime,
- describeVoterState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
-
- // Node 1 catches up to the last fetch offset
- int prevFetchTime = fetchTime;
- assertFalse(state.updateLocalState(new LogOffsetMetadata(200L),
voters));
- assertTrue(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(100L)));
- caughtUpTime = prevFetchTime;
- assertEquals(
- currentTime,
- describeVoterState(state, localReplicaKey.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertEquals(
- caughtUpTime,
- describeVoterState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
-
- // Node2 has never caught up to leader
- assertEquals(
- -1L,
- describeVoterState(state, nodeKey2.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertFalse(state.updateLocalState(new LogOffsetMetadata(300L),
voters));
- assertTrue(state.updateReplicaState(nodeKey2, ++fetchTime, new
LogOffsetMetadata(200L)));
- assertEquals(
- -1L,
- describeVoterState(state, nodeKey2.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertTrue(state.updateReplicaState(nodeKey2, ++fetchTime, new
LogOffsetMetadata(250L)));
- assertEquals(
- -1L,
- describeVoterState(state, nodeKey2.id(),
currentTime).lastCaughtUpTimestamp()
- );
- }
-
- @ParameterizedTest
- @ValueSource(booleans = { true, false })
- public void testLastCaughtUpTimeObserver(boolean withDirectoryId) {
- ReplicaKey nodeKey1 = replicaKey(1, withDirectoryId);
- int currentTime = 1000;
- int fetchTime = 0;
- int caughtUpTime = -1;
-
- VoterSet voters = VoterSetTest.voterSet(Stream.of(localReplicaKey));
- LeaderState<?> state = newLeaderState(voters, 5L);
-
- assertEquals(Optional.empty(), state.highWatermark());
- assertEquals(emptySet(), state.nonAcknowledgingVoters());
-
- // Node 1 falls behind
- assertTrue(state.updateLocalState(new LogOffsetMetadata(11L), voters));
- assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(10L)));
- assertEquals(
- currentTime,
- describeVoterState(state, localReplicaKey.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertEquals(
- caughtUpTime,
- describeObserverState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
-
- // Node 1 catches up to leader
- assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(11L)));
- caughtUpTime = fetchTime;
- assertEquals(
- currentTime,
- describeVoterState(state, localReplicaKey.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertEquals(
- caughtUpTime,
- describeObserverState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
-
- // Node 1 falls behind
- assertTrue(state.updateLocalState(new LogOffsetMetadata(100L),
voters));
- assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(50L)));
- assertEquals(
- currentTime,
- describeVoterState(state, localReplicaKey.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertEquals(
- caughtUpTime,
- describeObserverState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
-
- // Node 1 catches up to the last fetch offset
- int prevFetchTime = fetchTime;
- assertTrue(state.updateLocalState(new LogOffsetMetadata(200L),
voters));
- assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(102L)));
- caughtUpTime = prevFetchTime;
- assertEquals(
- currentTime,
- describeVoterState(state, localReplicaKey.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertEquals(
- caughtUpTime,
- describeObserverState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
-
- // Node 1 catches up to leader
- assertFalse(state.updateReplicaState(nodeKey1, ++fetchTime, new
LogOffsetMetadata(200L)));
- caughtUpTime = fetchTime;
- assertEquals(
- currentTime,
- describeVoterState(state, localReplicaKey.id(),
currentTime).lastCaughtUpTimestamp()
- );
- assertEquals(
- caughtUpTime,
- describeObserverState(state, nodeKey1.id(),
currentTime).lastCaughtUpTimestamp()
- );
- }
-
@Test
public void testIdempotentEndOffsetUpdate() {
VoterSet voters = VoterSetTest.voterSet(Stream.of(localReplicaKey));
@@ -560,7 +400,6 @@ public class LeaderStateTest {
// Follower crashes and disk is lost. It fetches an earlier offset to
rebuild state.
// The leader will report an error in the logs, but will not let the
high watermark rewind
assertFalse(state.updateReplicaState(nodeKey1, time.milliseconds(),
new LogOffsetMetadata(5L)));
- assertEquals(5L, describeVoterState(state, nodeKey1.id(),
time.milliseconds()).logEndOffset());
assertEquals(Optional.of(new LogOffsetMetadata(10L)),
state.highWatermark());
}
@@ -587,301 +426,6 @@ public class LeaderStateTest {
);
}
- @Test
- public void testDescribeQuorumWithSingleVoter() {
- MockTime time = new MockTime();
- long leaderStartOffset = 10L;
- long leaderEndOffset = 15L;
-
- VoterSet voters = VoterSetTest.voterSet(Stream.of(localReplicaKey));
- LeaderState<?> state = newLeaderState(voters, leaderStartOffset);
-
- // Until we have updated local state, high watermark should be
uninitialized
- assertEquals(Optional.empty(), state.highWatermark());
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
- assertEquals(-1, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(epoch, partitionData.leaderEpoch());
- assertEquals(Collections.emptyList(), partitionData.observers());
- assertEquals(1, partitionData.currentVoters().size());
- // KAFKA-16953 will add support for including the directory id
- assertEquals(
- new DescribeQuorumResponseData.ReplicaState()
- .setReplicaId(localReplicaKey.id())
- .setLogEndOffset(-1)
- .setLastFetchTimestamp(time.milliseconds())
- .setLastCaughtUpTimestamp(time.milliseconds()),
- partitionData.currentVoters().get(0)
- );
-
-
- // Now update the high watermark and verify the describe output
- assertTrue(state.updateLocalState(new
LogOffsetMetadata(leaderEndOffset), voters));
- assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)),
state.highWatermark());
-
- time.sleep(500);
-
- partitionData = state.describeQuorum(time.milliseconds());
- assertEquals(leaderEndOffset, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(epoch, partitionData.leaderEpoch());
- assertEquals(Collections.emptyList(), partitionData.observers());
- assertEquals(1, partitionData.currentVoters().size());
- // KAFKA-16953 will add support for including the directory id
- assertEquals(
- new DescribeQuorumResponseData.ReplicaState()
- .setReplicaId(localReplicaKey.id())
- .setLogEndOffset(leaderEndOffset)
- .setLastFetchTimestamp(time.milliseconds())
- .setLastCaughtUpTimestamp(time.milliseconds()),
- partitionData.currentVoters().get(0)
- );
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testDescribeQuorumWithMultipleVoters(boolean withDirectoryId) {
- MockTime time = new MockTime();
- ReplicaKey activeFollowerKey = replicaKey(1, withDirectoryId);
- ReplicaKey inactiveFollowerKey = replicaKey(2, withDirectoryId);
- long leaderStartOffset = 10L;
- long leaderEndOffset = 15L;
-
- VoterSet voters = localWithRemoteVoterSet(
- Stream.of(activeFollowerKey, inactiveFollowerKey),
- withDirectoryId
- );
- LeaderState<?> state = newLeaderState(voters, leaderStartOffset);
-
- assertFalse(state.updateLocalState(new
LogOffsetMetadata(leaderEndOffset), voters));
- assertEquals(Optional.empty(), state.highWatermark());
-
- long activeFollowerFetchTimeMs = time.milliseconds();
- assertTrue(
- state.updateReplicaState(
- activeFollowerKey,
- activeFollowerFetchTimeMs,
- new LogOffsetMetadata(leaderEndOffset)
- )
- );
- assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)),
state.highWatermark());
-
- time.sleep(500);
-
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
- assertEquals(leaderEndOffset, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(epoch, partitionData.leaderEpoch());
- assertEquals(Collections.emptyList(), partitionData.observers());
-
- List<DescribeQuorumResponseData.ReplicaState> voterStates =
partitionData.currentVoters();
- assertEquals(3, voterStates.size());
-
- DescribeQuorumResponseData.ReplicaState leaderState =
findReplicaOrFail(
- localReplicaKey.id(),
- partitionData.currentVoters()
- );
- // KAFKA-16953 will add support for including the directory id
- assertEquals(
- new DescribeQuorumResponseData.ReplicaState()
- .setReplicaId(localReplicaKey.id())
- .setLogEndOffset(leaderEndOffset)
- .setLastFetchTimestamp(time.milliseconds())
- .setLastCaughtUpTimestamp(time.milliseconds()),
- leaderState
- );
-
- DescribeQuorumResponseData.ReplicaState activeFollowerState =
findReplicaOrFail(
- activeFollowerKey.id(),
- partitionData.currentVoters()
- );
- assertEquals(
- new DescribeQuorumResponseData.ReplicaState()
- .setReplicaId(activeFollowerKey.id())
- .setLogEndOffset(leaderEndOffset)
- .setLastFetchTimestamp(activeFollowerFetchTimeMs)
- .setLastCaughtUpTimestamp(activeFollowerFetchTimeMs),
- activeFollowerState
- );
-
- DescribeQuorumResponseData.ReplicaState inactiveFollowerState =
findReplicaOrFail(
- inactiveFollowerKey.id(),
- partitionData.currentVoters()
- );
- assertEquals(
- new DescribeQuorumResponseData.ReplicaState()
- .setReplicaId(inactiveFollowerKey.id())
- .setLogEndOffset(-1)
- .setLastFetchTimestamp(-1)
- .setLastCaughtUpTimestamp(-1),
- inactiveFollowerState
- );
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testDescribeQuorumWithObservers(boolean withDirectoryId) {
- MockTime time = new MockTime();
-
- ReplicaKey observerKey = replicaKey(10, withDirectoryId);
- long epochStartOffset = 10L;
-
- VoterSet voters = localWithRemoteVoterSet(Stream.empty(),
withDirectoryId);
- LeaderState<?> state = newLeaderState(voters, epochStartOffset);
- assertTrue(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), voters));
- assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)),
state.highWatermark());
-
- time.sleep(500);
- long observerFetchTimeMs = time.milliseconds();
- assertFalse(
- state.updateReplicaState(
- observerKey,
- observerFetchTimeMs,
- new LogOffsetMetadata(epochStartOffset + 1)
- )
- );
-
- time.sleep(500);
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
- assertEquals(epochStartOffset + 1, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(epoch, partitionData.leaderEpoch());
-
- assertEquals(1, partitionData.currentVoters().size());
- assertEquals(localReplicaKey.id(),
partitionData.currentVoters().get(0).replicaId());
- // KAFKA-16953 will add support for including the directory id
- assertEquals(
- ReplicaKey.NO_DIRECTORY_ID,
- partitionData.currentVoters().get(0).replicaDirectoryId()
- );
-
- List<DescribeQuorumResponseData.ReplicaState> observerStates =
partitionData.observers();
- assertEquals(1, observerStates.size());
-
- DescribeQuorumResponseData.ReplicaState observerState =
observerStates.get(0);
- // KAFKA-16953 will add support for including the directory id
- assertEquals(new DescribeQuorumResponseData.ReplicaState()
- .setReplicaId(observerKey.id())
- .setLogEndOffset(epochStartOffset + 1)
- .setLastFetchTimestamp(observerFetchTimeMs)
- .setLastCaughtUpTimestamp(observerFetchTimeMs),
- observerState);
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testDescribeQuorumWithVotersAndObservers(boolean
withDirectoryId) {
- MockTime time = new MockTime();
- ReplicaKey nodeKey1 = replicaKey(1, withDirectoryId);
- ReplicaKey nodeKey2 = replicaKey(2, withDirectoryId);
- long epochStartOffset = 10L;
-
- VoterSet voters = localWithRemoteVoterSet(Stream.of(nodeKey1,
nodeKey2), withDirectoryId);
- LeaderState<?> state = newLeaderState(voters, epochStartOffset);
-
-
- assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), voters));
- assertTrue(state.updateReplicaState(nodeKey2, 0, new
LogOffsetMetadata(epochStartOffset + 1)));
- assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)),
state.highWatermark());
-
- // node1 becomes an observer
- long fetchTimeMs = time.milliseconds();
- assertFalse(state.updateReplicaState(nodeKey1, fetchTimeMs, new
LogOffsetMetadata(epochStartOffset + 1)));
- VoterSet votersWithoutNode1 = voters.removeVoter(nodeKey1).get();
- state.updateLocalState(new LogOffsetMetadata(epochStartOffset + 5),
votersWithoutNode1);
-
- time.sleep(500);
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
- assertEquals(epochStartOffset + 1, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(epoch, partitionData.leaderEpoch());
- DescribeQuorumResponseData.ReplicaState observer =
partitionData.observers().get(0);
- assertEquals(nodeKey1.id(), observer.replicaId());
- // KAFKA-16953 will add support for including the directory id
- assertEquals(
- ReplicaKey.NO_DIRECTORY_ID,
- observer.replicaDirectoryId()
- );
- assertEquals(epochStartOffset + 1, observer.logEndOffset());
- assertEquals(2, partitionData.currentVoters().size());
-
- // node1 catches up with leader, HW should not change
- time.sleep(500);
- fetchTimeMs = time.milliseconds();
- assertFalse(state.updateReplicaState(nodeKey1, fetchTimeMs, new
LogOffsetMetadata(epochStartOffset + 5)));
- assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)),
state.highWatermark());
-
- // node1 becomes a voter again, HW should change
- assertTrue(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 10), voters));
-
- time.sleep(500);
- partitionData = state.describeQuorum(time.milliseconds());
- assertEquals(epochStartOffset + 5, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(epoch, partitionData.leaderEpoch());
- assertEquals(Collections.emptyList(), partitionData.observers());
- assertEquals(3, partitionData.currentVoters().size());
- DescribeQuorumResponseData.ReplicaState node1State = partitionData
- .currentVoters()
- .stream()
- .filter(replicaState -> replicaState.replicaId() == nodeKey1.id())
- .findFirst()
- .get();
- assertEquals(epochStartOffset + 5, node1State.logEndOffset());
- assertEquals(fetchTimeMs, node1State.lastFetchTimestamp());
- }
-
- @Test
- public void testClearInactiveObserversIgnoresLeader() {
- MockTime time = new MockTime();
- ReplicaKey followerKey = ReplicaKey.of(1, Uuid.randomUuid());
- ReplicaKey observerKey = ReplicaKey.of(10, Uuid.randomUuid());
- long epochStartOffset = 10L;
-
- VoterSet voters = localWithRemoteVoterSet(Stream.of(followerKey),
true);
- LeaderState<?> state = newLeaderState(voters, epochStartOffset);
-
- assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 1), voters));
- assertTrue(state.updateReplicaState(followerKey, time.milliseconds(),
new LogOffsetMetadata(epochStartOffset + 1)));
-
- // observer is returned since its lastFetchTimestamp is within
OBSERVER_SESSION_TIMEOUT_MS
- time.sleep(500);
- long observerFetchTimeMs = time.milliseconds();
- assertFalse(state.updateReplicaState(observerKey, observerFetchTimeMs,
new LogOffsetMetadata(epochStartOffset + 1)));
-
- time.sleep(500);
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
- assertEquals(epochStartOffset + 1, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(2, partitionData.currentVoters().size());
- assertEquals(1, partitionData.observers().size());
- assertEquals(observerKey.id(),
partitionData.observers().get(0).replicaId());
-
- // observer is not returned once its lastFetchTimestamp surpasses
OBSERVER_SESSION_TIMEOUT_MS
- time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
- partitionData = state.describeQuorum(time.milliseconds());
- assertEquals(epochStartOffset + 1, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(2, partitionData.currentVoters().size());
- assertEquals(0, partitionData.observers().size());
-
- // leader becomes observer
- VoterSet votersWithoutLeader =
voters.removeVoter(localReplicaKey).get();
- assertFalse(state.updateLocalState(new
LogOffsetMetadata(epochStartOffset + 10), votersWithoutLeader));
-
- // leader should be returned in describe quorum output
- time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
- long describeQuorumCalledTime = time.milliseconds();
- partitionData = state.describeQuorum(describeQuorumCalledTime);
- assertEquals(epochStartOffset + 1, partitionData.highWatermark());
- assertEquals(localReplicaKey.id(), partitionData.leaderId());
- assertEquals(1, partitionData.currentVoters().size());
- assertEquals(1, partitionData.observers().size());
- DescribeQuorumResponseData.ReplicaState observer =
partitionData.observers().get(0);
- assertEquals(localReplicaKey.id(), observer.replicaId());
- assertEquals(describeQuorumCalledTime, observer.lastFetchTimestamp());
- }
-
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCheckQuorum(boolean withDirectoryId) {
@@ -1026,63 +570,6 @@ public class LeaderStateTest {
assertEquals(Optional.of(new LogOffsetMetadata(13L)),
state.highWatermark());
}
- @Test
- public void testNoOpForNegativeRemoteNodeId() {
- MockTime time = new MockTime();
- int replicaId = -1;
- long epochStartOffset = 10L;
-
- LeaderState<?> state = newLeaderState(
- VoterSetTest.voterSet(Stream.of(localReplicaKey)),
- epochStartOffset
- );
- assertFalse(
- state.updateReplicaState(
- ReplicaKey.of(replicaId, ReplicaKey.NO_DIRECTORY_ID),
- 0,
- new LogOffsetMetadata(epochStartOffset)
- )
- );
-
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
- List<DescribeQuorumResponseData.ReplicaState> observerStates =
partitionData.observers();
- assertEquals(Collections.emptyList(), observerStates);
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testObserverStateExpiration(boolean withDirectoryId) {
- MockTime time = new MockTime();
- ReplicaKey observerKey = replicaKey(10, withDirectoryId);
- long epochStartOffset = 10L;
-
- LeaderState<?> state = newLeaderState(
- VoterSetTest.voterSet(Stream.of(localReplicaKey)),
- epochStartOffset
- );
-
- state.updateReplicaState(
- observerKey,
- time.milliseconds(),
- new LogOffsetMetadata(epochStartOffset)
- );
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(time.milliseconds());
- List<DescribeQuorumResponseData.ReplicaState> observerStates =
partitionData.observers();
- assertEquals(1, observerStates.size());
-
- DescribeQuorumResponseData.ReplicaState observerState =
observerStates.get(0);
- assertEquals(observerKey.id(), observerState.replicaId());
- // KAFKA-16953 will add support for including the directory id
- assertEquals(
- ReplicaKey.NO_DIRECTORY_ID,
- observerState.replicaDirectoryId()
- );
-
- time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS);
- partitionData = state.describeQuorum(time.milliseconds());
- assertEquals(Collections.emptyList(), partitionData.observers());
- }
-
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testGrantVote(boolean isLogUpToDate) {
@@ -1147,36 +634,6 @@ public class LeaderStateTest {
}
}
- private DescribeQuorumResponseData.ReplicaState describeVoterState(
- LeaderState<?> state,
- int voterId,
- long currentTimeMs
- ) {
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(currentTimeMs);
- return findReplicaOrFail(voterId, partitionData.currentVoters());
- }
-
- private DescribeQuorumResponseData.ReplicaState describeObserverState(
- LeaderState<?> state,
- int observerId,
- long currentTimeMs
- ) {
- DescribeQuorumResponseData.PartitionData partitionData =
state.describeQuorum(currentTimeMs);
- return findReplicaOrFail(observerId, partitionData.observers());
- }
-
- private DescribeQuorumResponseData.ReplicaState findReplicaOrFail(
- int replicaId,
- List<DescribeQuorumResponseData.ReplicaState> replicas
- ) {
- return replicas.stream()
- .filter(observer -> observer.replicaId() == replicaId)
- .findFirst()
- .orElseThrow(() -> new AssertionError(
- "Failed to find expected replica state for replica " +
replicaId
- ));
- }
-
private ReplicaKey replicaKey(int id, boolean withDirectoryId) {
Uuid directoryId = withDirectoryId ? Uuid.randomUuid() :
ReplicaKey.NO_DIRECTORY_ID;
return ReplicaKey.of(id, directoryId);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index cab37a3241e..695af19f7b1 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -650,19 +650,41 @@ public final class RaftClientTestContext {
long highWatermark,
List<ReplicaState> voterStates,
List<ReplicaState> observerStates
+ ) {
+ assertSentDescribeQuorumResponse(Errors.NONE, leaderId, leaderEpoch,
highWatermark, voterStates, observerStates);
+ }
+
+ void assertSentDescribeQuorumResponse(
+ Errors error,
+ int leaderId,
+ int leaderEpoch,
+ long highWatermark,
+ List<ReplicaState> voterStates,
+ List<ReplicaState> observerStates
) {
DescribeQuorumResponseData response = collectDescribeQuorumResponse();
DescribeQuorumResponseData.PartitionData partitionData = new
DescribeQuorumResponseData.PartitionData()
- .setErrorCode(Errors.NONE.code())
+ .setErrorCode(error.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
.setHighWatermark(highWatermark)
.setCurrentVoters(voterStates)
.setObservers(observerStates);
- // KAFKA-16953 will add support for including the node listeners in
the node collection
- DescribeQuorumResponseData.NodeCollection nodes = new
DescribeQuorumResponseData.NodeCollection();
+ if (!error.equals(Errors.NONE)) {
+ partitionData.setErrorMessage(error.message());
+ }
+
+ DescribeQuorumResponseData.NodeCollection nodes = new
DescribeQuorumResponseData.NodeCollection(0);
+ if (describeQuorumRpcVersion() >= 2) {
+ nodes = new
DescribeQuorumResponseData.NodeCollection(voterStates.size());
+ for (ReplicaState voterState : voterStates) {
+ nodes.add(new DescribeQuorumResponseData.Node()
+ .setNodeId(voterState.replicaId())
+
.setListeners(startingVoters.listeners(voterState.replicaId()).toDescribeQuorumResponseListeners()));
+ }
+ }
DescribeQuorumResponseData expectedResponse =
DescribeQuorumResponse.singletonResponse(
metadataPartition,
@@ -680,6 +702,7 @@ public final class RaftClientTestContext {
.sorted(Comparator.comparingInt(ReplicaState::replicaId))
.collect(Collectors.toList());
response.topics().get(0).partitions().get(0).setCurrentVoters(sortedVoters);
+
response.nodes().sort(Comparator.comparingInt(DescribeQuorumResponseData.Node::nodeId));
assertEquals(expectedResponse, response);
}