hachikuji commented on code in PR #12508: URL: https://github.com/apache/kafka/pull/12508#discussion_r947244567
########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -319,57 +320,34 @@ private ReplicaState getReplicaState(int remoteNodeId) { } List<DescribeQuorumResponseData.ReplicaState> quorumResponseVoterStates(long currentTimeMs) { - return quorumResponseReplicaStates(voterStates, OptionalInt.of(localId), currentTimeMs); + return quorumResponseReplicaStates(voterStates.values(), OptionalInt.of(localId), currentTimeMs); } List<DescribeQuorumResponseData.ReplicaState> quorumResponseObserverStates(long currentTimeMs) { clearInactiveObservers(currentTimeMs); - return quorumResponseReplicaStates(observerStates, OptionalInt.empty(), currentTimeMs); + return quorumResponseReplicaStates(observerStates.values(), OptionalInt.empty(), currentTimeMs); } private static <R extends ReplicaState> List<DescribeQuorumResponseData.ReplicaState> quorumResponseReplicaStates( - Map<Integer, R> state, + Collection<R> state, OptionalInt leaderId, Review Comment: nit: It seems fine to pass through `leaderId` here even for observers. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -246,20 +265,30 @@ private List<ReplicaState> followersByDescendingFetchOffset() { .collect(Collectors.toList()); } - private boolean updateEndOffset(ReplicaState state, - LogOffsetMetadata endOffsetMetadata) { + private void verifyEndOffsetUpdate( + ReplicaState state, + LogOffsetMetadata endOffsetMetadata + ) { state.endOffset.ifPresent(currentEndOffset -> { if (currentEndOffset.offset > endOffsetMetadata.offset) { if (state.nodeId == localId) { throw new IllegalStateException("Detected non-monotonic update of local " + - "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); + "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); } else { log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}", - state.nodeId, currentEndOffset.offset, endOffsetMetadata.offset); + state.nodeId, currentEndOffset.offset, endOffsetMetadata.offset); } } }); - + } + private boolean updateEndOffset( + ReplicaState state, Review Comment: nit: indentation for parameters is usually 4 spaces ########## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ########## @@ -778,4 +779,52 @@ class KRaftClusterTest { cluster.close() } } + def createAdminClient(cluster: KafkaClusterTestKit, useController: Boolean): Admin = { + var props: Properties = null + props = cluster.clientProperties() + props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName) + Admin.create(props) + } + + @Test + def testDescribeQuorumRequestToBrokers() : Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setNumControllerNodes(3).build()).build() + try { + cluster.format + cluster.startup + for (i <- 0 to 3) { + TestUtils.waitUntilTrue(() => cluster.brokers.get(i).brokerState == BrokerState.RUNNING, + "Broker Never started up") + } + val admin = createAdminClient(cluster, false) + try { + val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions) + val quorumInfo = quorumState.quorumInfo.get() + + assertEquals(0, quorumInfo.observers.size) + assertEquals(3, quorumInfo.voters.size) Review Comment: You may have missed my comment above. The nice thing about asserting the set directly is that it ensures that all expected voters/observers are present in the result. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -290,22 +319,35 @@ private ReplicaState getReplicaState(int remoteNodeId) { return state; } - Map<Integer, Long> getVoterEndOffsets() { - return getReplicaEndOffsets(voterStates); + List<DescribeQuorumResponseData.ReplicaState> quorumResponseVoterStates(long currentTimeMs) { + return quorumResponseReplicaStates(voterStates.values(), OptionalInt.of(localId), currentTimeMs); } - Map<Integer, Long> getObserverStates(final long currentTimeMs) { + List<DescribeQuorumResponseData.ReplicaState> quorumResponseObserverStates(long currentTimeMs) { clearInactiveObservers(currentTimeMs); - return getReplicaEndOffsets(observerStates); - } - - private static <R extends ReplicaState> Map<Integer, Long> getReplicaEndOffsets( - Map<Integer, R> replicaStates) { - return replicaStates.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, - e -> e.getValue().endOffset.map( - logOffsetMetadata -> logOffsetMetadata.offset).orElse(-1L)) - ); + return quorumResponseReplicaStates(observerStates.values(), OptionalInt.empty(), currentTimeMs); + } + + private static <R extends ReplicaState> List<DescribeQuorumResponseData.ReplicaState> quorumResponseReplicaStates( Review Comment: We can remove the generic type. Both observers and voters use the same `ReplicaState` type. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -217,20 +220,36 @@ public boolean updateLocalState(long fetchTimestamp, LogOffsetMetadata logOffset * @param replicaId replica id * @param fetchTimestamp fetch timestamp * @param logOffsetMetadata new log offset and metadata + * @param leaderLogEndOffset current log end offset of the leader * @return true if the high watermark is updated too */ - public boolean updateReplicaState(int replicaId, - long fetchTimestamp, - LogOffsetMetadata logOffsetMetadata) { + public boolean updateReplicaState( + int replicaId, + long fetchTimestamp, + LogOffsetMetadata logOffsetMetadata, + Long leaderLogEndOffset Review Comment: Could we use the primitive `long`? I don't think this is intended to be nullable, is it? ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -353,10 +406,12 @@ else if (!that.endOffset.isPresent()) @Override public String toString() { return String.format( - "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, hasAcknowledgedLeader=%s)", + "ReplicaState(nodeId=%d, endOffset=%s, lastFetchTimestamp=%s, " + + " lastCaughtUpTimestamp=%s, hasAcknowledgedLeader=%s)", Review Comment: nit: extra space at the beginning (previous line already has a space) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org