This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5151f6 KAFKA-10828; Replacing endorsing with acknowledging for
voters (#9737)
d5151f6 is described below
commit d5151f6f095896799296aecaa483a76981425ed5
Author: vamossagar12 <[email protected]>
AuthorDate: Tue Dec 22 23:35:07 2020 +0530
KAFKA-10828; Replacing endorsing with acknowledging for voters (#9737)
This PR replaces the terms endorsing with acknowledging for voters which
have recognised the current leader.
Reviewers: Jason Gustafson <[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 4 +--
.../java/org/apache/kafka/raft/LeaderState.java | 36 +++++++++++++---------
.../org/apache/kafka/raft/KafkaRaftClientTest.java | 2 +-
.../org/apache/kafka/raft/LeaderStateTest.java | 20 ++++++------
4 files changed, 34 insertions(+), 28 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 37eee73..2abbebf 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -731,7 +731,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
} else if (partitionError == Errors.NONE) {
if (quorum.isLeader()) {
LeaderState state = quorum.leaderStateOrThrow();
- state.addEndorsementFrom(remoteNodeId);
+ state.addAcknowledgementFrom(remoteNodeId);
} else {
logger.debug("Ignoring BeginQuorumEpoch response {} since " +
"this node is not the leader anymore", response);
@@ -1600,7 +1600,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
long timeUntilSend = maybeSendRequests(
currentTimeMs,
- state.nonEndorsingVoters(),
+ state.nonAcknowledgingVoters(),
this::buildBeginQuorumEpochRequest
);
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index c44f0f0..c35abf1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -26,6 +26,12 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
+/**
+ * In the context of LeaderState, an acknowledged voter means one who has
acknowledged the current leader by either
+ * responding to a `BeginQuorumEpoch` request from the leader or by beginning
to send `Fetch` requests.
+ * More specifically, the set of unacknowledged voters are targets for
BeginQuorumEpoch requests from the leader until
+ * they acknowledge the leader.
+ */
public class LeaderState implements EpochState {
private final int localId;
private final int epoch;
@@ -50,8 +56,8 @@ public class LeaderState implements EpochState {
this.highWatermark = Optional.empty();
for (int voterId : voters) {
- boolean hasEndorsedLeader = voterId == localId;
- this.voterReplicaStates.put(voterId, new VoterState(voterId,
hasEndorsedLeader));
+ boolean hasAcknowledgedLeader = voterId == localId;
+ this.voterReplicaStates.put(voterId, new VoterState(voterId,
hasAcknowledgedLeader));
}
this.grantingVoters.addAll(grantingVoters);
}
@@ -83,13 +89,13 @@ public class LeaderState implements EpochState {
return localId;
}
- public Set<Integer> nonEndorsingVoters() {
- Set<Integer> nonEndorsing = new HashSet<>();
+ public Set<Integer> nonAcknowledgingVoters() {
+ Set<Integer> nonAcknowledging = new HashSet<>();
for (VoterState state : voterReplicaStates.values()) {
- if (!state.hasEndorsedLeader)
- nonEndorsing.add(state.nodeId);
+ if (!state.hasAcknowledgedLeader)
+ nonAcknowledging.add(state.nodeId);
}
- return nonEndorsing;
+ return nonAcknowledging;
}
private boolean updateHighWatermark() {
@@ -180,22 +186,22 @@ public class LeaderState implements EpochState {
state.endOffset = Optional.of(endOffsetMetadata);
if (isVoter(state.nodeId)) {
- ((VoterState) state).hasEndorsedLeader = true;
- addEndorsementFrom(state.nodeId);
+ ((VoterState) state).hasAcknowledgedLeader = true;
+ addAcknowledgementFrom(state.nodeId);
return updateHighWatermark();
}
return false;
}
- public void addEndorsementFrom(int remoteNodeId) {
+ public void addAcknowledgementFrom(int remoteNodeId) {
VoterState voterState = ensureValidVoter(remoteNodeId);
- voterState.hasEndorsedLeader = true;
+ voterState.hasAcknowledgedLeader = true;
}
private VoterState ensureValidVoter(int remoteNodeId) {
VoterState state = voterReplicaStates.get(remoteNodeId);
if (state == null)
- throw new IllegalArgumentException("Unexpected endorsement from
non-voter " + remoteNodeId);
+ throw new IllegalArgumentException("Unexpected acknowledgement
from non-voter " + remoteNodeId);
return state;
}
@@ -272,12 +278,12 @@ public class LeaderState implements EpochState {
}
private static class VoterState extends ReplicaState {
- boolean hasEndorsedLeader;
+ boolean hasAcknowledgedLeader;
public VoterState(int nodeId,
- boolean hasEndorsedLeader) {
+ boolean hasAcknowledgedLeader) {
super(nodeId);
- this.hasEndorsedLeader = hasEndorsedLeader;
+ this.hasAcknowledgedLeader = hasAcknowledgedLeader;
}
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 43fb230..60f4b98 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -1681,7 +1681,7 @@ public class KafkaRaftClientTest {
}
@Test
- public void testFetchShouldBeTreatedAsLeaderEndorsement() throws Exception
{
+ public void testFetchShouldBeTreatedAsLeaderAcknowledgement() throws
Exception {
int localId = 0;
int otherNodeId = 1;
int epoch = 5;
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 197ffbc..8ec91a7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -36,22 +36,22 @@ public class LeaderStateTest {
private final int epoch = 5;
@Test
- public void testFollowerEndorsement() {
+ public void testFollowerAcknowledgement() {
int node1 = 1;
int node2 = 2;
LeaderState state = new LeaderState(localId, epoch, 0L, mkSet(localId,
node1, node2), Collections.emptySet());
- assertEquals(mkSet(node1, node2), state.nonEndorsingVoters());
- state.addEndorsementFrom(node1);
- assertEquals(Collections.singleton(node2), state.nonEndorsingVoters());
- state.addEndorsementFrom(node2);
- assertEquals(Collections.emptySet(), state.nonEndorsingVoters());
+ assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
+ state.addAcknowledgementFrom(node1);
+ assertEquals(Collections.singleton(node2),
state.nonAcknowledgingVoters());
+ state.addAcknowledgementFrom(node2);
+ assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
}
@Test
- public void testNonFollowerEndorsement() {
+ public void testNonFollowerAcknowledgement() {
int nonVoterId = 1;
LeaderState state = new LeaderState(localId, epoch, 0L,
Collections.singleton(localId), Collections.emptySet());
- assertThrows(IllegalArgumentException.class, () ->
state.addEndorsementFrom(nonVoterId));
+ assertThrows(IllegalArgumentException.class, () ->
state.addAcknowledgementFrom(nonVoterId));
}
@Test
@@ -102,7 +102,7 @@ public class LeaderStateTest {
assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
assertEquals(Optional.empty(), state.highWatermark());
assertTrue(state.updateReplicaState(otherNodeId, 0, new
LogOffsetMetadata(10L)));
- assertEquals(Collections.emptySet(), state.nonEndorsingVoters());
+ assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
assertEquals(Optional.of(new LogOffsetMetadata(10L)),
state.highWatermark());
assertTrue(state.updateReplicaState(otherNodeId, 0, new
LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
@@ -128,7 +128,7 @@ public class LeaderStateTest {
assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
assertEquals(Optional.empty(), state.highWatermark());
assertTrue(state.updateReplicaState(node1, 0, new
LogOffsetMetadata(10L)));
- assertEquals(Collections.singleton(node2), state.nonEndorsingVoters());
+ assertEquals(Collections.singleton(node2),
state.nonAcknowledgingVoters());
assertEquals(Optional.of(new LogOffsetMetadata(10L)),
state.highWatermark());
assertTrue(state.updateReplicaState(node2, 0, new
LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());