This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5151f6  KAFKA-10828; Replacing endorsing with acknowledging for 
voters  (#9737)
d5151f6 is described below

commit d5151f6f095896799296aecaa483a76981425ed5
Author: vamossagar12 <[email protected]>
AuthorDate: Tue Dec 22 23:35:07 2020 +0530

    KAFKA-10828; Replacing endorsing with acknowledging for voters  (#9737)
    
    This PR replaces the terms endorsing with acknowledging for voters which 
have recognised the current leader.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  4 +--
 .../java/org/apache/kafka/raft/LeaderState.java    | 36 +++++++++++++---------
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  2 +-
 .../org/apache/kafka/raft/LeaderStateTest.java     | 20 ++++++------
 4 files changed, 34 insertions(+), 28 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 37eee73..2abbebf 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -731,7 +731,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         } else if (partitionError == Errors.NONE) {
             if (quorum.isLeader()) {
                 LeaderState state = quorum.leaderStateOrThrow();
-                state.addEndorsementFrom(remoteNodeId);
+                state.addAcknowledgementFrom(remoteNodeId);
             } else {
                 logger.debug("Ignoring BeginQuorumEpoch response {} since " +
                     "this node is not the leader anymore", response);
@@ -1600,7 +1600,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
         long timeUntilSend = maybeSendRequests(
             currentTimeMs,
-            state.nonEndorsingVoters(),
+            state.nonAcknowledgingVoters(),
             this::buildBeginQuorumEpochRequest
         );
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index c44f0f0..c35abf1 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -26,6 +26,12 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+/**
+ * In the context of LeaderState, an acknowledged voter means one who has 
acknowledged the current leader by either
+ * responding to a `BeginQuorumEpoch` request from the leader or by beginning 
to send `Fetch` requests.
+ * More specifically, the set of unacknowledged voters are targets for 
BeginQuorumEpoch requests from the leader until
+ * they acknowledge the leader.
+ */
 public class LeaderState implements EpochState {
     private final int localId;
     private final int epoch;
@@ -50,8 +56,8 @@ public class LeaderState implements EpochState {
         this.highWatermark = Optional.empty();
 
         for (int voterId : voters) {
-            boolean hasEndorsedLeader = voterId == localId;
-            this.voterReplicaStates.put(voterId, new VoterState(voterId, 
hasEndorsedLeader));
+            boolean hasAcknowledgedLeader = voterId == localId;
+            this.voterReplicaStates.put(voterId, new VoterState(voterId, 
hasAcknowledgedLeader));
         }
         this.grantingVoters.addAll(grantingVoters);
     }
@@ -83,13 +89,13 @@ public class LeaderState implements EpochState {
         return localId;
     }
 
-    public Set<Integer> nonEndorsingVoters() {
-        Set<Integer> nonEndorsing = new HashSet<>();
+    public Set<Integer> nonAcknowledgingVoters() {
+        Set<Integer> nonAcknowledging = new HashSet<>();
         for (VoterState state : voterReplicaStates.values()) {
-            if (!state.hasEndorsedLeader)
-                nonEndorsing.add(state.nodeId);
+            if (!state.hasAcknowledgedLeader)
+                nonAcknowledging.add(state.nodeId);
         }
-        return nonEndorsing;
+        return nonAcknowledging;
     }
 
     private boolean updateHighWatermark() {
@@ -180,22 +186,22 @@ public class LeaderState implements EpochState {
         state.endOffset = Optional.of(endOffsetMetadata);
 
         if (isVoter(state.nodeId)) {
-            ((VoterState) state).hasEndorsedLeader = true;
-            addEndorsementFrom(state.nodeId);
+            ((VoterState) state).hasAcknowledgedLeader = true;
+            addAcknowledgementFrom(state.nodeId);
             return updateHighWatermark();
         }
         return false;
     }
 
-    public void addEndorsementFrom(int remoteNodeId) {
+    public void addAcknowledgementFrom(int remoteNodeId) {
         VoterState voterState = ensureValidVoter(remoteNodeId);
-        voterState.hasEndorsedLeader = true;
+        voterState.hasAcknowledgedLeader = true;
     }
 
     private VoterState ensureValidVoter(int remoteNodeId) {
         VoterState state = voterReplicaStates.get(remoteNodeId);
         if (state == null)
-            throw new IllegalArgumentException("Unexpected endorsement from 
non-voter " + remoteNodeId);
+            throw new IllegalArgumentException("Unexpected acknowledgement 
from non-voter " + remoteNodeId);
         return state;
     }
 
@@ -272,12 +278,12 @@ public class LeaderState implements EpochState {
     }
 
     private static class VoterState extends ReplicaState {
-        boolean hasEndorsedLeader;
+        boolean hasAcknowledgedLeader;
 
         public VoterState(int nodeId,
-                          boolean hasEndorsedLeader) {
+                          boolean hasAcknowledgedLeader) {
             super(nodeId);
-            this.hasEndorsedLeader = hasEndorsedLeader;
+            this.hasAcknowledgedLeader = hasAcknowledgedLeader;
         }
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 43fb230..60f4b98 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -1681,7 +1681,7 @@ public class KafkaRaftClientTest {
     }
 
     @Test
-    public void testFetchShouldBeTreatedAsLeaderEndorsement() throws Exception 
{
+    public void testFetchShouldBeTreatedAsLeaderAcknowledgement() throws 
Exception {
         int localId = 0;
         int otherNodeId = 1;
         int epoch = 5;
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index 197ffbc..8ec91a7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -36,22 +36,22 @@ public class LeaderStateTest {
     private final int epoch = 5;
 
     @Test
-    public void testFollowerEndorsement() {
+    public void testFollowerAcknowledgement() {
         int node1 = 1;
         int node2 = 2;
         LeaderState state = new LeaderState(localId, epoch, 0L, mkSet(localId, 
node1, node2), Collections.emptySet());
-        assertEquals(mkSet(node1, node2), state.nonEndorsingVoters());
-        state.addEndorsementFrom(node1);
-        assertEquals(Collections.singleton(node2), state.nonEndorsingVoters());
-        state.addEndorsementFrom(node2);
-        assertEquals(Collections.emptySet(), state.nonEndorsingVoters());
+        assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
+        state.addAcknowledgementFrom(node1);
+        assertEquals(Collections.singleton(node2), 
state.nonAcknowledgingVoters());
+        state.addAcknowledgementFrom(node2);
+        assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
     }
 
     @Test
-    public void testNonFollowerEndorsement() {
+    public void testNonFollowerAcknowledgement() {
         int nonVoterId = 1;
         LeaderState state = new LeaderState(localId, epoch, 0L, 
Collections.singleton(localId), Collections.emptySet());
-        assertThrows(IllegalArgumentException.class, () -> 
state.addEndorsementFrom(nonVoterId));
+        assertThrows(IllegalArgumentException.class, () -> 
state.addAcknowledgementFrom(nonVoterId));
     }
 
     @Test
@@ -102,7 +102,7 @@ public class LeaderStateTest {
         assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
         assertEquals(Optional.empty(), state.highWatermark());
         assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(10L)));
-        assertEquals(Collections.emptySet(), state.nonEndorsingVoters());
+        assertEquals(Collections.emptySet(), state.nonAcknowledgingVoters());
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
         assertTrue(state.updateReplicaState(otherNodeId, 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
@@ -128,7 +128,7 @@ public class LeaderStateTest {
         assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L)));
         assertEquals(Optional.empty(), state.highWatermark());
         assertTrue(state.updateReplicaState(node1, 0, new 
LogOffsetMetadata(10L)));
-        assertEquals(Collections.singleton(node2), state.nonEndorsingVoters());
+        assertEquals(Collections.singleton(node2), 
state.nonAcknowledgingVoters());
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
         assertTrue(state.updateReplicaState(node2, 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());

Reply via email to