hachikuji commented on code in PR #12508:
URL: https://github.com/apache/kafka/pull/12508#discussion_r947244567


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -319,57 +320,34 @@ private ReplicaState getReplicaState(int remoteNodeId) {
     }
 
     List<DescribeQuorumResponseData.ReplicaState> 
quorumResponseVoterStates(long currentTimeMs) {
-        return quorumResponseReplicaStates(voterStates, 
OptionalInt.of(localId), currentTimeMs);
+        return quorumResponseReplicaStates(voterStates.values(), 
OptionalInt.of(localId), currentTimeMs);
     }
 
     List<DescribeQuorumResponseData.ReplicaState> 
quorumResponseObserverStates(long currentTimeMs) {
         clearInactiveObservers(currentTimeMs);
-        return quorumResponseReplicaStates(observerStates, 
OptionalInt.empty(), currentTimeMs);
+        return quorumResponseReplicaStates(observerStates.values(), 
OptionalInt.empty(), currentTimeMs);
     }
 
     private static <R extends ReplicaState> 
List<DescribeQuorumResponseData.ReplicaState> quorumResponseReplicaStates(
-            Map<Integer, R> state,
+            Collection<R> state,
             OptionalInt leaderId,

Review Comment:
   nit: It seems fine to pass through `leaderId` here even for observers. 



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -246,20 +265,30 @@ private List<ReplicaState> 
followersByDescendingFetchOffset() {
             .collect(Collectors.toList());
     }
 
-    private boolean updateEndOffset(ReplicaState state,
-                                    LogOffsetMetadata endOffsetMetadata) {
+    private void verifyEndOffsetUpdate(
+            ReplicaState state,
+            LogOffsetMetadata endOffsetMetadata
+    ) {
         state.endOffset.ifPresent(currentEndOffset -> {
             if (currentEndOffset.offset > endOffsetMetadata.offset) {
                 if (state.nodeId == localId) {
                     throw new IllegalStateException("Detected non-monotonic 
update of local " +
-                        "end offset: " + currentEndOffset.offset + " -> " + 
endOffsetMetadata.offset);
+                            "end offset: " + currentEndOffset.offset + " -> " 
+ endOffsetMetadata.offset);
                 } else {
                     log.warn("Detected non-monotonic update of fetch offset 
from nodeId {}: {} -> {}",
-                        state.nodeId, currentEndOffset.offset, 
endOffsetMetadata.offset);
+                            state.nodeId, currentEndOffset.offset, 
endOffsetMetadata.offset);
                 }
             }
         });
-
+    }
+    private boolean updateEndOffset(
+            ReplicaState state,

Review Comment:
   nit: indentation for parameters is usually 4 spaces



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -778,4 +779,52 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+  def createAdminClient(cluster: KafkaClusterTestKit, useController: Boolean): 
Admin = {
+    var props: Properties = null
+    props = cluster.clientProperties()
+    props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
+    Admin.create(props)
+  }
+
+  @Test
+  def testDescribeQuorumRequestToBrokers() : Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format
+      cluster.startup
+      for (i <- 0 to 3) {
+        TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == 
BrokerState.RUNNING,
+          "Broker Never started up")
+      }
+      val admin = createAdminClient(cluster, false)
+      try {
+        val quorumState = admin.describeMetadataQuorum(new 
DescribeMetadataQuorumOptions)
+        val quorumInfo = quorumState.quorumInfo.get()
+
+        assertEquals(0, quorumInfo.observers.size)
+        assertEquals(3, quorumInfo.voters.size)

Review Comment:
   You may have missed my comment above. The nice thing about asserting the set 
directly is that it ensures that all expected voters/observers are present in 
the result.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -290,22 +319,35 @@ private ReplicaState getReplicaState(int remoteNodeId) {
         return state;
     }
 
-    Map<Integer, Long> getVoterEndOffsets() {
-        return getReplicaEndOffsets(voterStates);
+    List<DescribeQuorumResponseData.ReplicaState> 
quorumResponseVoterStates(long currentTimeMs) {
+        return quorumResponseReplicaStates(voterStates.values(), 
OptionalInt.of(localId), currentTimeMs);
     }
 
-    Map<Integer, Long> getObserverStates(final long currentTimeMs) {
+    List<DescribeQuorumResponseData.ReplicaState> 
quorumResponseObserverStates(long currentTimeMs) {
         clearInactiveObservers(currentTimeMs);
-        return getReplicaEndOffsets(observerStates);
-    }
-
-    private static <R extends ReplicaState> Map<Integer, Long> 
getReplicaEndOffsets(
-        Map<Integer, R> replicaStates) {
-        return replicaStates.entrySet().stream()
-                   .collect(Collectors.toMap(Map.Entry::getKey,
-                       e -> e.getValue().endOffset.map(
-                           logOffsetMetadata -> 
logOffsetMetadata.offset).orElse(-1L))
-                   );
+        return quorumResponseReplicaStates(observerStates.values(), 
OptionalInt.empty(), currentTimeMs);
+    }
+
+    private static <R extends ReplicaState> 
List<DescribeQuorumResponseData.ReplicaState> quorumResponseReplicaStates(

Review Comment:
   We can remove the generic type. Both observers and voters use the same 
`ReplicaState` type.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -217,20 +220,36 @@ public boolean updateLocalState(long fetchTimestamp, 
LogOffsetMetadata logOffset
      * @param replicaId replica id
      * @param fetchTimestamp fetch timestamp
      * @param logOffsetMetadata new log offset and metadata
+     * @param leaderLogEndOffset current log end offset of the leader
      * @return true if the high watermark is updated too
      */
-    public boolean updateReplicaState(int replicaId,
-                                      long fetchTimestamp,
-                                      LogOffsetMetadata logOffsetMetadata) {
+    public boolean updateReplicaState(
+            int replicaId,
+            long fetchTimestamp,
+            LogOffsetMetadata logOffsetMetadata,
+            Long leaderLogEndOffset

Review Comment:
   Could we use the primitive `long`? I don't think this is intended to be 
nullable, is it?



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -353,10 +406,12 @@ else if (!that.endOffset.isPresent())
         @Override
         public String toString() {
             return String.format(
-                "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, 
hasAcknowledgedLeader=%s)",
+                "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, 
" +
+                        " lastCaughtUpTimestamp=%s, hasAcknowledgedLeader=%s)",

Review Comment:
   nit: extra space at the beginning (previous line already has a space)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to