showuon commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953262169


##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -302,21 +302,112 @@ public void 
testGetNonLeaderFollowersByFetchOffsetDescending() {
     }
 
     @Test
-    public void testGetVoterStates() {
-        int node1 = 1;
-        int node2 = 2;
+    public void testDescribeQuorumWithSingleVoter() {
+        MockTime time = new MockTime();
         long leaderStartOffset = 10L;
         long leaderEndOffset = 15L;
 
-        LeaderState<?> state = setUpLeaderAndFollowers(node1, node2, 
leaderStartOffset, leaderEndOffset);
+        LeaderState<?> state = newLeaderState(mkSet(localId), 
leaderStartOffset);
+
+        // Until we have updated local state, high watermark should be 
uninitialized
+        assertEquals(Optional.empty(), state.highWatermark());
+        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
+        assertEquals(-1, partitionData.highWatermark());
+        assertEquals(-1, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(-1)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+
+
+        // Now update the high watermark and verify that describe output
+        long highWatermarkUpdateTimeMs = time.milliseconds();
+        assertTrue(state.updateLocalState(highWatermarkUpdateTimeMs, new 
LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), 
state.highWatermark());
+
+        time.sleep(500);
+
+        partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTimeMs, 
partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(leaderEndOffset)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+    }
+
+    @Test
+    public void testDescribeQuorumWithMultipleVoters() {
+        MockTime time = new MockTime();
+        int activeFollowerId = 1;
+        int inactiveFollowerId = 2;
+        long leaderStartOffset = 10L;
+        long leaderEndOffset = 15L;
 
-        assertEquals(mkMap(
-            mkEntry(localId, leaderEndOffset),
-            mkEntry(node1, leaderStartOffset),
-            mkEntry(node2, leaderEndOffset)
-        ), state.quorumResponseVoterStates(0)
-            .stream()
-            
.collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, 
DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        LeaderState<?> state = newLeaderState(mkSet(localId, activeFollowerId, 
inactiveFollowerId), leaderStartOffset);
+        assertFalse(state.updateLocalState(time.milliseconds(), new 
LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.empty(), state.highWatermark());
+
+        long activeFollowerFetchTimeMs = time.milliseconds();
+        assertTrue(state.updateReplicaState(activeFollowerId, 
activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), 
state.highWatermark());
+
+        time.sleep(500);
+
+        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(activeFollowerFetchTimeMs, 
partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+
+        List<DescribeQuorumResponseData.ReplicaState> voterStates = 
partitionData.currentVoters();
+        assertEquals(3, voterStates.size());
+
+        DescribeQuorumResponseData.ReplicaState leaderState = 
voterStates.stream()
+            .filter(voterState -> voterState.replicaId() == localId)
+            .findFirst()
+            .orElseThrow(() -> new AssertionError(""));

Review Comment:
   Please update it and below. Thanks.



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -326,35 +417,57 @@ private LeaderState<?> setUpLeaderAndFollowers(int 
follower1,
         LeaderState<?> state = newLeaderState(mkSet(localId, follower1, 
follower2), leaderStartOffset);
         state.updateLocalState(0, new LogOffsetMetadata(leaderEndOffset));
         assertEquals(Optional.empty(), state.highWatermark());
-        state.updateReplicaState(follower1, 0, new 
LogOffsetMetadata(leaderStartOffset), leaderEndOffset);
-        state.updateReplicaState(follower2, 0, new 
LogOffsetMetadata(leaderEndOffset), leaderEndOffset);
+        state.updateReplicaState(follower1, 0, new 
LogOffsetMetadata(leaderStartOffset));
+        state.updateReplicaState(follower2, 0, new 
LogOffsetMetadata(leaderEndOffset));
         return state;
     }
 
     @Test
-    public void testGetObserverStatesWithObserver() {
+    public void testDescribeQuorumWithObservers() {
+        MockTime time = new MockTime();
         int observerId = 10;
         long epochStartOffset = 10L;
 
         LeaderState<?> state = newLeaderState(mkSet(localId), 
epochStartOffset);
-        long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new 
LogOffsetMetadata(epochStartOffset), epochStartOffset + 10));
-
-        assertEquals(Collections.singletonMap(observerId, epochStartOffset),
-                state.quorumResponseObserverStates(timestamp)
-                    .stream()
-                    
.collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, 
DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        long highWatermarkUpdateTime = time.milliseconds();
+        assertTrue(state.updateLocalState(time.milliseconds(), new 
LogOffsetMetadata(epochStartOffset + 1)));
+        assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), 
state.highWatermark());
+
+        time.sleep(500);
+        long observerFetchTimeMs = time.milliseconds();
+        assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, 
new LogOffsetMetadata(epochStartOffset + 1)));
+
+        time.sleep(500);
+        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
+        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTime, 
partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+
+        List<DescribeQuorumResponseData.ReplicaState> observerStates = 
partitionData.observers();
+        assertEquals(1, observerStates.size());

Review Comment:
   Any comment here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to