jsancio commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953171207
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -313,43 +312,70 @@ private ReplicaState getReplicaState(int remoteNodeId) {
return state;
}
- List<DescribeQuorumResponseData.ReplicaState>
quorumResponseVoterStates(long currentTimeMs) {
- return quorumResponseReplicaStates(voterStates.values(), localId,
currentTimeMs);
+ public DescribeQuorumResponseData.PartitionData describeQuorum(long
currentTimeMs) {
+ clearInactiveObservers(currentTimeMs);
+
+ return new DescribeQuorumResponseData.PartitionData()
+ .setErrorCode(Errors.NONE.code())
+ .setLeaderId(localId)
+ .setLeaderEpoch(epoch)
+ .setHighWatermark(highWatermark().map(offsetMetadata ->
offsetMetadata.offset).orElse(-1L))
+ .setCurrentVoters(describeReplicaStates(voterStates,
currentTimeMs))
+ .setObservers(describeReplicaStates(observerStates,
currentTimeMs));
}
- List<DescribeQuorumResponseData.ReplicaState>
quorumResponseObserverStates(long currentTimeMs) {
- clearInactiveObservers(currentTimeMs);
- return quorumResponseReplicaStates(observerStates.values(), localId,
currentTimeMs);
+ // Visible for testing
+ DescribeQuorumResponseData.ReplicaState describeVoterState(
Review Comment:
Hmm. Okay. Since we are adding this for testing only, I am wondering if we
should have tests call `describeQuorum(long)` and have a test utility that
filters and returns the `ReplicaState` for a give voter or observer.
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -204,50 +198,80 @@ private boolean updateHighWatermark() {
return false;
}
+ private void setHighWatermark(
+ Optional<LogOffsetMetadata> newHighWatermark,
Review Comment:
Looks like based on the call sites `newHighWatermark.isPresent()` is always
`true`. Should we document that this method takes a
`Optional<LogOffsetMetadata>` so that we don't allocate a new object? Should
this method "assert" that `newHighWatermark.isPresent()` is always `true`?
##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
* @return The value of the lastFetchTime if known, empty otherwise
*/
public OptionalLong lastFetchTimeMs() {
Review Comment:
Let's fix this to `lastFetchTimestamp`.
##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -125,15 +135,15 @@ public long logEndOffset() {
* @return The value of the lastFetchTime if known, empty otherwise
*/
public OptionalLong lastFetchTimeMs() {
- return lastFetchTimeMs;
+ return lastFetchTimestamp;
}
/**
* Return the lastCaughtUpTime in milliseconds for this replica.
* @return The value of the lastCaughtUpTime if known, empty otherwise
*/
public OptionalLong lastCaughtUpTimeMs() {
Review Comment:
Let's fix this to `lastCaughtUpTimestamp`.
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -359,31 +385,46 @@ private boolean isVoter(int remoteNodeId) {
private static class ReplicaState implements Comparable<ReplicaState> {
final int nodeId;
Optional<LogOffsetMetadata> endOffset;
- OptionalLong lastFetchTimestamp;
- OptionalLong lastFetchLeaderLogEndOffset;
- OptionalLong lastCaughtUpTimestamp;
+ long lastFetchTimestamp;
+ long lastFetchLeaderLogEndOffset;
+ long lastCaughtUpTimestamp;
boolean hasAcknowledgedLeader;
public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
this.nodeId = nodeId;
this.endOffset = Optional.empty();
- this.lastFetchTimestamp = OptionalLong.empty();
- this.lastFetchLeaderLogEndOffset = OptionalLong.empty();
- this.lastCaughtUpTimestamp = OptionalLong.empty();
+ this.lastFetchTimestamp = -1;
+ this.lastFetchLeaderLogEndOffset = -1;
+ this.lastCaughtUpTimestamp = -1;
this.hasAcknowledgedLeader = hasAcknowledgedLeader;
}
- void updateFetchTimestamp(long currentFetchTimeMs, long
leaderLogEndOffset) {
- // To be resilient to system time shifts we do not strictly
- // require the timestamp be monotonically increasing.
- lastFetchTimestamp =
OptionalLong.of(Math.max(lastFetchTimestamp.orElse(-1L), currentFetchTimeMs));
- lastFetchLeaderLogEndOffset = OptionalLong.of(leaderLogEndOffset);
+ void updateLeaderState(
+ LogOffsetMetadata endOffsetMetadata
+ ) {
+ // For the leader, we only update the end offset. The remaining
fields
+ // (such as the caught up time) are determined implicitly.
+ this.endOffset = Optional.of(endOffsetMetadata);
}
- void updateLastCaughtUpTimestamp(long lastCaughtUpTime) {
- // This value relies on the fetch timestamp which does not
- // require monotonicity
- lastCaughtUpTimestamp =
OptionalLong.of(Math.max(lastCaughtUpTimestamp.orElse(-1L), lastCaughtUpTime));
+ void updateFollowerState(
+ long currentTimeMs,
+ LogOffsetMetadata fetchOffsetMetadata,
+ Optional<LogOffsetMetadata> leaderEndOffsetOpt
+ ) {
+ leaderEndOffsetOpt.ifPresent(leaderEndOffset -> {
+ if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) {
+ lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp,
currentTimeMs);
+ } else if (lastFetchLeaderLogEndOffset > 0
+ && fetchOffsetMetadata.offset >=
lastFetchLeaderLogEndOffset) {
+ lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp,
lastFetchTimestamp);
+ }
+ lastFetchLeaderLogEndOffset = leaderEndOffset.offset;
Review Comment:
What do you think about writing a comment explaining that the order of these
two statements is important? For example, if `lastFetchLeaderLogEndOffset =
leaderEndOffset.offset` is moved to before the `if` this algorithm doesn't work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]