hachikuji commented on code in PR #12508:
URL: https://github.com/apache/kafka/pull/12508#discussion_r948237284


##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -119,6 +122,93 @@ public void testNonMonotonicLocalEndOffsetUpdate() {
             () -> state.updateLocalState(0, new LogOffsetMetadata(15L)));
     }
 
+    @Test
+    public void testLastCaughtUpTimeVoters() {
+        int node1 = 1;
+        int node2 = 2;
+        int currentTime = 1000;
+        int fetchTime = 0;
+        int caughtupTime = -1;
+        LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 
10L);
+        assertEquals(Optional.empty(), state.highWatermark());
+        assertFalse(state.updateLocalState(++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
+        assertEquals(Optional.empty(), state.highWatermark());
+
+        // Node 1 falls behind
+        assertFalse(state.updateLocalState(++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(10L), 11L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(),
 caughtupTime);
+
+        // Node 1 catches up to leader
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(11L), 11L));
+        caughtupTime = fetchTime;
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(),
 caughtupTime);
+
+        // Node 1 falls behind
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(50L), 100L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(),
 caughtupTime);
+
+        // Node 1 catches up to the last fetch offset
+        int prevFetchTime = fetchTime;
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(102L), 200L));
+        caughtupTime = prevFetchTime;
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(),
 caughtupTime);
+
+        // Node2 has never caught up to leader
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(),
 -1L);
+        assertTrue(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(202L), 300L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(),
 -1L);
+        assertFalse(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(250L), 300L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(),
 -1L);
+    }
+
+    @Test
+    public void testLastCaughtUpTimeObserver() {
+        int node1Index = 0;
+        int node1Id = 1;
+        int currentTime = 1000;
+        int fetchTime = 0;
+        int caughtupTime = -1;

Review Comment:
   nit: `caughtUpTime`?



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -224,7 +314,9 @@ public void testGetVoterStates() {
             mkEntry(localId, leaderEndOffset),
             mkEntry(node1, leaderStartOffset),
             mkEntry(node2, leaderEndOffset)
-        ), state.getVoterEndOffsets());
+        ), state.quorumResponseVoterStates(0)
+                .stream()

Review Comment:
   nit: indentation looks kind of strange here. Probably should be 4 spaces? 
Also below on lines 344 and 368.



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -188,13 +278,13 @@ public void testNonMonotonicHighWatermarkUpdate() {
         int node1 = 1;
         LeaderState<?> state = newLeaderState(mkSet(localId, node1), 0L);
         state.updateLocalState(time.milliseconds(), new 
LogOffsetMetadata(10L));
-        state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(10L));
+        state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(10L), 11L);
         assertEquals(Optional.of(new LogOffsetMetadata(10L)), 
state.highWatermark());
 
         // Follower crashes and disk is lost. It fetches an earlier offset to 
rebuild state.
         // The leader will report an error in the logs, but will not let the 
high watermark rewind
-        assertFalse(state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(5L)));
-        assertEquals(5L, state.getVoterEndOffsets().get(node1));
+        assertFalse(state.updateReplicaState(node1, time.milliseconds(), new 
LogOffsetMetadata(5L), 6L));

Review Comment:
   Why does the leader end offset go backwards here? Previously it was at 11.



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -119,6 +122,93 @@ public void testNonMonotonicLocalEndOffsetUpdate() {
             () -> state.updateLocalState(0, new LogOffsetMetadata(15L)));
     }
 
+    @Test
+    public void testLastCaughtUpTimeVoters() {
+        int node1 = 1;
+        int node2 = 2;
+        int currentTime = 1000;
+        int fetchTime = 0;
+        int caughtupTime = -1;
+        LeaderState<?> state = newLeaderState(mkSet(localId, node1, node2), 
10L);
+        assertEquals(Optional.empty(), state.highWatermark());
+        assertFalse(state.updateLocalState(++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters());
+        assertEquals(Optional.empty(), state.highWatermark());
+
+        // Node 1 falls behind
+        assertFalse(state.updateLocalState(++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(10L), 11L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(),
 caughtupTime);
+
+        // Node 1 catches up to leader
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(11L), 11L));
+        caughtupTime = fetchTime;
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(),
 caughtupTime);
+
+        // Node 1 falls behind
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(50L), 100L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(),
 caughtupTime);
+
+        // Node 1 catches up to the last fetch offset
+        int prevFetchTime = fetchTime;
+        assertFalse(state.updateReplicaState(node1, ++fetchTime, new 
LogOffsetMetadata(102L), 200L));
+        caughtupTime = prevFetchTime;
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp(),
 caughtupTime);
+
+        // Node2 has never caught up to leader
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(),
 -1L);
+        assertTrue(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(202L), 300L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(),
 -1L);
+        assertFalse(state.updateReplicaState(node2, ++fetchTime, new 
LogOffsetMetadata(250L), 300L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp(),
 -1L);
+    }
+
+    @Test
+    public void testLastCaughtUpTimeObserver() {
+        int node1Index = 0;
+        int node1Id = 1;
+        int currentTime = 1000;
+        int fetchTime = 0;
+        int caughtupTime = -1;
+        LeaderState<?> state = newLeaderState(singleton(localId), 5L);
+        assertEquals(Optional.empty(), state.highWatermark());
+        assertEquals(emptySet(), state.nonAcknowledgingVoters());
+
+        // Node 1 falls behind
+        assertTrue(state.updateLocalState(++fetchTime, new 
LogOffsetMetadata(10L)));
+        assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new 
LogOffsetMetadata(10L), 11L));
+        
assertEquals(state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp(),
 currentTime);

Review Comment:
   The expected value should always be the first argument. A few of these in 
this test class.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -217,20 +219,36 @@ public boolean updateLocalState(long fetchTimestamp, 
LogOffsetMetadata logOffset
      * @param replicaId replica id
      * @param fetchTimestamp fetch timestamp
      * @param logOffsetMetadata new log offset and metadata
+     * @param leaderLogEndOffset current log end offset of the leader
      * @return true if the high watermark is updated too
      */
-    public boolean updateReplicaState(int replicaId,
-                                      long fetchTimestamp,
-                                      LogOffsetMetadata logOffsetMetadata) {
+    public boolean updateReplicaState(
+        int replicaId,
+        long fetchTimestamp,
+        LogOffsetMetadata logOffsetMetadata,
+        long leaderLogEndOffset
+    ) {
         // Ignore fetches from negative replica id, as it indicates
         // the fetch is from non-replica. For example, a consumer.
         if (replicaId < 0) {
             return false;
         }
 
         ReplicaState state = getReplicaState(replicaId);
-        state.updateFetchTimestamp(fetchTimestamp);
+
+        // Only proceed with updating the states if the offset update is valid
+        verifyEndOffsetUpdate(state, logOffsetMetadata);
+
+        // Update the Last CaughtUp Time
+        if (logOffsetMetadata.offset >= leaderLogEndOffset) {
+            state.updateLastCaughtUpTimestamp(fetchTimestamp);
+        } else if (logOffsetMetadata.offset >= 
state.lastFetchLeaderLogEndOffset.orElse(-1L)) {
+            
state.updateLastCaughtUpTimestamp(state.lastFetchTimestamp.orElse(-1L));
+        }
+
+        state.updateFetchTimestamp(fetchTimestamp, leaderLogEndOffset);
         return updateEndOffset(state, logOffsetMetadata);
+

Review Comment:
   nit: unneeded newline



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -246,20 +264,26 @@ private List<ReplicaState> 
followersByDescendingFetchOffset() {
             .collect(Collectors.toList());
     }
 
-    private boolean updateEndOffset(ReplicaState state,
-                                    LogOffsetMetadata endOffsetMetadata) {
+    private void verifyEndOffsetUpdate(
+        ReplicaState state,
+        LogOffsetMetadata endOffsetMetadata
+    ) {
         state.endOffset.ifPresent(currentEndOffset -> {
             if (currentEndOffset.offset > endOffsetMetadata.offset) {
                 if (state.nodeId == localId) {
                     throw new IllegalStateException("Detected non-monotonic 
update of local " +
-                        "end offset: " + currentEndOffset.offset + " -> " + 
endOffsetMetadata.offset);
+                            "end offset: " + currentEndOffset.offset + " -> " 
+ endOffsetMetadata.offset);

Review Comment:
   nit: this indentation should be reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to