hachikuji commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r953179732
##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -359,31 +385,46 @@ private boolean isVoter(int remoteNodeId) {
private static class ReplicaState implements Comparable<ReplicaState> {
final int nodeId;
Optional<LogOffsetMetadata> endOffset;
- OptionalLong lastFetchTimestamp;
- OptionalLong lastFetchLeaderLogEndOffset;
- OptionalLong lastCaughtUpTimestamp;
+ long lastFetchTimestamp;
+ long lastFetchLeaderLogEndOffset;
+ long lastCaughtUpTimestamp;
boolean hasAcknowledgedLeader;
public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) {
this.nodeId = nodeId;
this.endOffset = Optional.empty();
- this.lastFetchTimestamp = OptionalLong.empty();
- this.lastFetchLeaderLogEndOffset = OptionalLong.empty();
- this.lastCaughtUpTimestamp = OptionalLong.empty();
+ this.lastFetchTimestamp = -1;
+ this.lastFetchLeaderLogEndOffset = -1;
+ this.lastCaughtUpTimestamp = -1;
this.hasAcknowledgedLeader = hasAcknowledgedLeader;
}
- void updateFetchTimestamp(long currentFetchTimeMs, long
leaderLogEndOffset) {
- // To be resilient to system time shifts we do not strictly
- // require the timestamp be monotonically increasing.
- lastFetchTimestamp =
OptionalLong.of(Math.max(lastFetchTimestamp.orElse(-1L), currentFetchTimeMs));
- lastFetchLeaderLogEndOffset = OptionalLong.of(leaderLogEndOffset);
+ void updateLeaderState(
+ LogOffsetMetadata endOffsetMetadata
+ ) {
+ // For the leader, we only update the end offset. The remaining
fields
+ // (such as the caught up time) are determined implicitly.
+ this.endOffset = Optional.of(endOffsetMetadata);
}
- void updateLastCaughtUpTimestamp(long lastCaughtUpTime) {
- // This value relies on the fetch timestamp which does not
- // require monotonicity
- lastCaughtUpTimestamp =
OptionalLong.of(Math.max(lastCaughtUpTimestamp.orElse(-1L), lastCaughtUpTime));
+ void updateFollowerState(
+ long currentTimeMs,
+ LogOffsetMetadata fetchOffsetMetadata,
+ Optional<LogOffsetMetadata> leaderEndOffsetOpt
+ ) {
+ leaderEndOffsetOpt.ifPresent(leaderEndOffset -> {
+ if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) {
+ lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp,
currentTimeMs);
+ } else if (lastFetchLeaderLogEndOffset > 0
+ && fetchOffsetMetadata.offset >=
lastFetchLeaderLogEndOffset) {
+ lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp,
lastFetchTimestamp);
+ }
+ lastFetchLeaderLogEndOffset = leaderEndOffset.offset;
Review Comment:
Yeah, good idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]