This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c0a092f5626 MINOR: Cleanups in raft module (#17877)
c0a092f5626 is described below

commit c0a092f56262fcc32534396f5d89a6bfa32064e5
Author: Mickael Maison <[email protected]>
AuthorDate: Thu Nov 21 15:19:07 2024 +0100

    MINOR: Cleanups in raft module (#17877)
    
    
    Reviewers: Yash Mayya <[email protected]>
---
 .../java/org/apache/kafka/raft/ElectionState.java  |  8 +++----
 .../java/org/apache/kafka/raft/FollowerState.java  |  4 ++--
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 28 +++++++++++-----------
 .../java/org/apache/kafka/raft/LeaderState.java    |  7 +++---
 .../java/org/apache/kafka/raft/QuorumState.java    |  8 +++----
 .../org/apache/kafka/raft/UnattachedState.java     |  2 +-
 .../kafka/raft/internals/AddVoterHandler.java      | 10 ++++----
 .../kafka/raft/internals/AddVoterHandlerState.java |  2 +-
 .../kafka/raft/internals/RecordsBatchReader.java   |  2 +-
 .../kafka/raft/internals/RecordsIterator.java      |  8 +++----
 .../kafka/raft/internals/RemoveVoterHandler.java   |  6 ++---
 .../kafka/raft/internals/UpdateVoterHandler.java   |  8 +++----
 .../kafka/snapshot/RecordsSnapshotReader.java      |  6 ++---
 .../kafka/snapshot/RecordsSnapshotWriter.java      |  2 +-
 .../java/org/apache/kafka/raft/EndpointsTest.java  |  4 ++--
 .../kafka/raft/KafkaRaftClientReconfigTest.java    | 18 +++++++-------
 .../test/java/org/apache/kafka/raft/MockLog.java   |  2 +-
 .../org/apache/kafka/raft/MockNetworkChannel.java  |  2 +-
 .../apache/kafka/raft/RaftEventSimulationTest.java |  4 ++--
 .../org/apache/kafka/raft/ReplicatedCounter.java   |  2 +-
 .../kafka/raft/internals/BatchBuilderTest.java     |  2 +-
 21 files changed, 67 insertions(+), 68 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/ElectionState.java 
b/raft/src/main/java/org/apache/kafka/raft/ElectionState.java
index e65e72890f5..675436cc52c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ElectionState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ElectionState.java
@@ -73,11 +73,11 @@ public final class ElectionState {
     public boolean isVotedCandidate(ReplicaKey nodeKey) {
         if (nodeKey.id() < 0) {
             throw new IllegalArgumentException("Invalid node key " + nodeKey);
-        } else if (!votedKey.isPresent()) {
+        } else if (votedKey.isEmpty()) {
             return false;
         } else if (votedKey.get().id() != nodeKey.id()) {
             return false;
-        } else if (!votedKey.get().directoryId().isPresent()) {
+        } else if (votedKey.get().directoryId().isEmpty()) {
             // when the persisted voted directory id is not present assume 
that we voted for this candidate;
             // this happens when the kraft version is 0.
             return true;
@@ -87,7 +87,7 @@ public final class ElectionState {
     }
 
     public int leaderId() {
-        if (!leaderId.isPresent())
+        if (leaderId.isEmpty())
             throw new IllegalStateException("Attempt to access nil leaderId");
         return leaderId.getAsInt();
     }
@@ -101,7 +101,7 @@ public final class ElectionState {
     }
 
     public ReplicaKey votedKey() {
-        if (!votedKey.isPresent()) {
+        if (votedKey.isEmpty()) {
             throw new IllegalStateException("Attempt to access nil votedId");
         }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java 
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index 49eecab5d61..67748f54adc 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -131,7 +131,7 @@ public class FollowerState implements EpochState {
     private long updateVoterPeriodMs() {
         // Allow for a few rounds of fetch request before attempting to update
         // the voter state
-        return fetchTimeoutMs * 3;
+        return fetchTimeoutMs * 3L;
     }
 
     public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) {
@@ -150,7 +150,7 @@ public class FollowerState implements EpochState {
     }
 
     public boolean updateHighWatermark(OptionalLong newHighWatermark) {
-        if (!newHighWatermark.isPresent() && highWatermark.isPresent()) {
+        if (newHighWatermark.isEmpty() && highWatermark.isPresent()) {
             throw new IllegalArgumentException(
                 String.format("Attempt to overwrite current high watermark %s 
with unknown value", highWatermark)
             );
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 51aa5e59f2f..cf4612355f4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1210,7 +1210,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         int position = 0;
         for (ReplicaKey candidate : preferredCandidates) {
             if (candidate.id() == quorum.localIdOrThrow()) {
-                if (!candidate.directoryId().isPresent() ||
+                if (candidate.directoryId().isEmpty() ||
                     
candidate.directoryId().get().equals(quorum.localDirectoryId())
                 ) {
                     // Found ourselves in the preferred candidate list
@@ -1788,7 +1788,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
         Optional<FetchSnapshotRequestData.PartitionSnapshot> 
partitionSnapshotOpt = FetchSnapshotRequest
             .forTopicPartition(data, log.topicPartition());
-        if (!partitionSnapshotOpt.isPresent()) {
+        if (partitionSnapshotOpt.isEmpty()) {
             // The Raft client assumes that there is only one topic partition.
             TopicPartition unknownTopicPartition = new TopicPartition(
                 data.topics().get(0).name(),
@@ -1828,7 +1828,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
 
         Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
-        if (!snapshotOpt.isPresent() || 
snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID)) {
+        if (snapshotOpt.isEmpty() || snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID)) 
{
             // The bootstrap checkpoint should not be replicated. The first 
leader will
             // make sure that the content of the bootstrap checkpoint is 
included in the
             // partition log
@@ -1944,7 +1944,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
         Optional<FetchSnapshotResponseData.PartitionSnapshot> 
partitionSnapshotOpt = FetchSnapshotResponse
             .forTopicPartition(data, log.topicPartition());
-        if (!partitionSnapshotOpt.isPresent()) {
+        if (partitionSnapshotOpt.isEmpty()) {
             return false;
         }
 
@@ -2098,7 +2098,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
 
         Optional<ReplicaKey> newVoter = RaftUtil.addVoterRequestVoterKey(data);
-        if (!newVoter.isPresent() || 
!newVoter.get().directoryId().isPresent()) {
+        if (newVoter.isEmpty() || newVoter.get().directoryId().isEmpty()) {
             return completedFuture(
                 new AddRaftVoterResponseData()
                     .setErrorCode(Errors.INVALID_REQUEST.code())
@@ -2107,7 +2107,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
 
         Endpoints newVoterEndpoints = 
Endpoints.fromAddVoterRequest(data.listeners());
-        if (!newVoterEndpoints.address(channel.listenerName()).isPresent()) {
+        if (newVoterEndpoints.address(channel.listenerName()).isEmpty()) {
             return completedFuture(
                 new AddRaftVoterResponseData()
                     .setErrorCode(Errors.INVALID_REQUEST.code())
@@ -2181,7 +2181,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
 
         Optional<ReplicaKey> oldVoter = 
RaftUtil.removeVoterRequestVoterKey(data);
-        if (!oldVoter.isPresent() || 
!oldVoter.get().directoryId().isPresent()) {
+        if (oldVoter.isEmpty() || oldVoter.get().directoryId().isEmpty()) {
             return completedFuture(
                 new RemoveRaftVoterResponseData()
                     .setErrorCode(Errors.INVALID_REQUEST.code())
@@ -2226,7 +2226,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
 
         Optional<ReplicaKey> voter = RaftUtil.updateVoterRequestVoterKey(data);
-        if (!voter.isPresent() || !voter.get().directoryId().isPresent()) {
+        if (voter.isEmpty() || voter.get().directoryId().isEmpty()) {
             return completedFuture(
                 RaftUtil.updateVoterResponse(
                     Errors.INVALID_REQUEST,
@@ -2238,7 +2238,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
 
         Endpoints voterEndpoints = 
Endpoints.fromUpdateVoterRequest(data.listeners());
-        if (!voterEndpoints.address(channel.listenerName()).isPresent()) {
+        if (voterEndpoints.address(channel.listenerName()).isEmpty()) {
             return completedFuture(
                 RaftUtil.updateVoterResponse(
                     Errors.INVALID_REQUEST,
@@ -2319,8 +2319,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             return quorum.isLeader();
         } else {
             return epoch != quorum.epoch()
-                || !leaderId.isPresent()
-                || !quorum.leaderId().isPresent()
+                || leaderId.isEmpty()
+                || quorum.leaderId().isEmpty()
                 || leaderId.equals(quorum.leaderId());
         }
     }
@@ -2516,7 +2516,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         return voterKey
             .map(key -> {
                 if (!OptionalInt.of(key.id()).equals(nodeId)) return false;
-                if (!key.directoryId().isPresent()) return true;
+                if (key.directoryId().isEmpty()) return true;
 
                 return key.directoryId().get().equals(nodeDirectoryId);
             })
@@ -3399,7 +3399,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             // Note that if we transition to another state before we have a 
chance to
             // request resignation, then we consider the call fulfilled.
             Optional<LeaderState<Object>> leaderStateOpt = 
quorum.maybeLeaderState();
-            if (!leaderStateOpt.isPresent()) {
+            if (leaderStateOpt.isEmpty()) {
                 logger.debug("Ignoring call to resign from epoch {} since this 
node is " +
                     "no longer the leader", epoch);
                 return;
@@ -3702,7 +3702,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 return true;
             } else {
                 return leaderAndEpoch.leaderId().isPresent() &&
-                    !lastFiredLeaderChange.leaderId().isPresent();
+                    lastFiredLeaderChange.leaderId().isEmpty();
             }
         }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index c09282c87c9..0024c849b21 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -35,7 +35,6 @@ import org.apache.kafka.server.common.KRaftVersion;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -112,7 +111,7 @@ public class LeaderState<T> implements EpochState {
                 new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader, 
voterNode.listeners())
             );
         }
-        this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
+        this.grantingVoters = Set.copyOf(grantingVoters);
         this.log = logContext.logger(LeaderState.class);
         this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
         // use the 1.5x of fetch timeout to tolerate some network transition 
time or other IO time.
@@ -809,9 +808,9 @@ public class LeaderState<T> implements EpochState {
         public int compareTo(ReplicaState that) {
             if (this.endOffset.equals(that.endOffset))
                 return this.replicaKey.compareTo(that.replicaKey);
-            else if (!this.endOffset.isPresent())
+            else if (this.endOffset.isEmpty())
                 return 1;
-            else if (!that.endOffset.isPresent())
+            else if (that.endOffset.isEmpty())
                 return -1;
             else
                 return Long.compare(that.endOffset.get().offset(), 
this.endOffset.get().offset());
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 0598ce062df..de82977583a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -140,7 +140,7 @@ public class QuorumState {
         ElectionState election = readElectionState();
 
         final EpochState initialState;
-        if (election.hasVoted() && !localId.isPresent()) {
+        if (election.hasVoted() && localId.isEmpty()) {
             throw new IllegalStateException(
                 String.format(
                     "Initialized quorum state (%s) with a voted candidate but 
without a local id",
@@ -332,7 +332,7 @@ public class QuorumState {
     }
 
     public boolean isVoter() {
-        if (!localId.isPresent()) {
+        if (localId.isEmpty()) {
             return false;
         }
 
@@ -425,7 +425,7 @@ public class QuorumState {
                     epoch
                 )
             );
-        } else if (!localId.isPresent()) {
+        } else if (localId.isEmpty()) {
             throw new IllegalStateException("Cannot transition to voted 
without a replica id");
         } else if (epoch < currentEpoch) {
             throw new IllegalStateException(
@@ -707,7 +707,7 @@ public class QuorumState {
     }
 
     public boolean isUnattachedNotVoted() {
-        return maybeUnattachedState().filter(unattached -> 
!unattached.votedKey().isPresent()).isPresent();
+        return maybeUnattachedState().filter(unattached -> 
unattached.votedKey().isEmpty()).isPresent();
     }
 
     public boolean isUnattachedAndVoted() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java 
b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index 4b21849f818..f41c6b8d563 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -123,7 +123,7 @@ public class UnattachedState implements EpochState {
         if (votedKey.isPresent()) {
             ReplicaKey votedReplicaKey = votedKey.get();
             if (votedReplicaKey.id() == candidateKey.id()) {
-                return !votedReplicaKey.directoryId().isPresent() || 
votedReplicaKey.directoryId().equals(candidateKey.directoryId());
+                return votedReplicaKey.directoryId().isEmpty() || 
votedReplicaKey.directoryId().equals(candidateKey.directoryId());
             }
             log.debug(
                 "Rejecting vote request from candidate ({}), already have 
voted for another " +
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
index 44b9eb2a39d..1f7ea2f61c4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
@@ -101,7 +101,7 @@ public final class AddVoterHandler {
 
         // Check that the leader has established a HWM and committed the 
current epoch
         Optional<Long> highWatermark = 
leaderState.highWatermark().map(LogOffsetMetadata::offset);
-        if (!highWatermark.isPresent()) {
+        if (highWatermark.isEmpty()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.addVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
@@ -127,7 +127,7 @@ public final class AddVoterHandler {
 
         // Check that there are no uncommitted VotersRecord
         Optional<LogHistory.Entry<VoterSet>> votersEntry = 
partitionState.lastVoterSetEntry();
-        if (!votersEntry.isPresent() || votersEntry.get().offset() >= 
highWatermark.get()) {
+        if (votersEntry.isEmpty() || votersEntry.get().offset() >= 
highWatermark.get()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.addVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
@@ -172,7 +172,7 @@ public final class AddVoterHandler {
             this::buildApiVersionsRequest,
             currentTimeMs
         );
-        if (!timeout.isPresent()) {
+        if (timeout.isEmpty()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.addVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
@@ -203,7 +203,7 @@ public final class AddVoterHandler {
         long currentTimeMs
     ) {
         Optional<AddVoterHandlerState> handlerState = 
leaderState.addVoterHandlerState();
-        if (!handlerState.isPresent()) {
+        if (handlerState.isEmpty()) {
             // There are no pending add operation just ignore the api response
             return true;
         }
@@ -242,7 +242,7 @@ public final class AddVoterHandler {
             return false;
         }
 
-        // Check that the new voter supports the kraft.verion for 
reconfiguration
+        // Check that the new voter supports the kraft.version for 
reconfiguration
         KRaftVersion kraftVersion = partitionState.lastKraftVersion();
         if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
             logger.info(
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
index c403d0e0cd2..b43197c273d 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
@@ -49,7 +49,7 @@ public final class AddVoterHandlerState {
     }
 
     public boolean expectingApiResponse(int replicaId) {
-        return !lastOffset.isPresent() && replicaId == voterKey.id();
+        return lastOffset.isEmpty() && replicaId == voterKey.id();
     }
 
     public void setLastOffset(long lastOffset) {
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
index a9f4e106aa1..64572b6bc49 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
@@ -51,7 +51,7 @@ public final class RecordsBatchReader<T> implements 
BatchReader<T> {
     public boolean hasNext() {
         ensureOpen();
 
-        if (!nextBatch.isPresent()) {
+        if (nextBatch.isEmpty()) {
             nextBatch = nextBatch();
         }
 
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index 82991389748..2ea3126de77 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -86,7 +86,7 @@ public final class RecordsIterator<T> implements 
Iterator<Batch<T>>, AutoCloseab
     public boolean hasNext() {
         ensureOpen();
 
-        if (!nextBatch.isPresent()) {
+        if (nextBatch.isEmpty()) {
             nextBatch = nextBatch();
         }
 
@@ -334,7 +334,7 @@ public final class RecordsIterator<T> implements 
Iterator<Batch<T>>, AutoCloseab
             throw new IllegalArgumentException("Got key in the record when no 
key was expected");
         }
 
-        if (!value.isPresent()) {
+        if (value.isEmpty()) {
             throw new IllegalArgumentException("Missing value in the record 
when a value was expected");
         } else if (value.get().remaining() == 0) {
             throw new IllegalArgumentException("Got an unexpected empty value 
in the record");
@@ -346,13 +346,13 @@ public final class RecordsIterator<T> implements 
Iterator<Batch<T>>, AutoCloseab
     }
 
     private static ControlRecord decodeControlRecord(Optional<ByteBuffer> key, 
Optional<ByteBuffer> value) {
-        if (!key.isPresent()) {
+        if (key.isEmpty()) {
             throw new IllegalArgumentException("Missing key in the record when 
a key was expected");
         } else if (key.get().remaining() == 0) {
             throw new IllegalArgumentException("Got an unexpected empty key in 
the record");
         }
 
-        if (!value.isPresent()) {
+        if (value.isEmpty()) {
             throw new IllegalArgumentException("Missing value in the record 
when a value was expected");
         } else if (value.get().remaining() == 0) {
             throw new IllegalArgumentException("Got an unexpected empty value 
in the record");
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
index 29093cc30b6..2dea86d593b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
@@ -91,7 +91,7 @@ public final class RemoveVoterHandler {
 
         // Check that the leader has established a HWM and committed the 
current epoch
         Optional<Long> highWatermark = 
leaderState.highWatermark().map(LogOffsetMetadata::offset);
-        if (!highWatermark.isPresent()) {
+        if (highWatermark.isEmpty()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.removeVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
@@ -117,7 +117,7 @@ public final class RemoveVoterHandler {
 
         // Check that there are no uncommitted VotersRecord
         Optional<LogHistory.Entry<VoterSet>> votersEntry = 
partitionState.lastVoterSetEntry();
-        if (!votersEntry.isPresent() || votersEntry.get().offset() >= 
highWatermark.get()) {
+        if (votersEntry.isEmpty() || votersEntry.get().offset() >= 
highWatermark.get()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.removeVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
@@ -132,7 +132,7 @@ public final class RemoveVoterHandler {
 
         // Remove the voter from the set of voters
         Optional<VoterSet> newVoters = 
votersEntry.get().value().removeVoter(voterKey);
-        if (!newVoters.isPresent()) {
+        if (newVoters.isEmpty()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.removeVoterResponse(
                     Errors.VOTER_NOT_FOUND,
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
index 417c1decad7..335e1b02a22 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
@@ -97,7 +97,7 @@ public final class UpdateVoterHandler {
 
         // Check that the leader has established a HWM and committed the 
current epoch
         Optional<Long> highWatermark = 
leaderState.highWatermark().map(LogOffsetMetadata::offset);
-        if (!highWatermark.isPresent()) {
+        if (highWatermark.isEmpty()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.updateVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
@@ -130,7 +130,7 @@ public final class UpdateVoterHandler {
 
         // Check that there are no uncommitted VotersRecord
         Optional<LogHistory.Entry<VoterSet>> votersEntry = 
partitionState.lastVoterSetEntry();
-        if (!votersEntry.isPresent() || votersEntry.get().offset() >= 
highWatermark.get()) {
+        if (votersEntry.isEmpty() || votersEntry.get().offset() >= 
highWatermark.get()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.updateVoterResponse(
                     Errors.REQUEST_TIMED_OUT,
@@ -160,7 +160,7 @@ public final class UpdateVoterHandler {
         }
 
         // Check that endpoinds includes the default listener
-        if (!voterEndpoints.address(defaultListenerName).isPresent()) {
+        if (voterEndpoints.address(defaultListenerName).isEmpty()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.updateVoterResponse(
                     Errors.INVALID_REQUEST,
@@ -188,7 +188,7 @@ public final class UpdateVoterHandler {
                     )
                 )
             );
-        if (!updatedVoters.isPresent()) {
+        if (updatedVoters.isEmpty()) {
             return CompletableFuture.completedFuture(
                 RaftUtil.updateVoterResponse(
                     Errors.VOTER_NOT_FOUND,
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java 
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
index c865dc2a1a4..fc815621d83 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
@@ -61,7 +61,7 @@ public final class RecordsSnapshotReader<T> implements 
SnapshotReader<T> {
 
     @Override
     public long lastContainedLogTimestamp() {
-        if (!lastContainedLogTimestamp.isPresent()) {
+        if (lastContainedLogTimestamp.isEmpty()) {
             nextBatch.ifPresent(batch -> {
                 throw new IllegalStateException(
                     String.format(
@@ -83,7 +83,7 @@ public final class RecordsSnapshotReader<T> implements 
SnapshotReader<T> {
 
     @Override
     public boolean hasNext() {
-        if (!nextBatch.isPresent()) {
+        if (nextBatch.isEmpty()) {
             nextBatch = nextBatch();
         }
 
@@ -127,7 +127,7 @@ public final class RecordsSnapshotReader<T> implements 
SnapshotReader<T> {
         if (iterator.hasNext()) {
             Batch<T> batch = iterator.next();
 
-            if (!lastContainedLogTimestamp.isPresent()) {
+            if (lastContainedLogTimestamp.isEmpty()) {
                 // This must be the first batch which is expected to be a 
control batch with at least one record for
                 // the snapshot header.
                 if (batch.controlRecords().isEmpty()) {
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java 
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
index ef26ce3bc05..47683d68bdb 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
@@ -192,7 +192,7 @@ public final class RecordsSnapshotWriter<T> implements 
SnapshotWriter<T> {
         }
 
         public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
-            if (!rawSnapshotWriter.isPresent()) {
+            if (rawSnapshotWriter.isEmpty()) {
                 throw new IllegalStateException("Builder::build called without 
a RawSnapshotWriter");
             } else if (rawSnapshotWriter.get().sizeInBytes() != 0) {
                 throw new IllegalStateException(
diff --git a/raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java 
b/raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java
index a4a39de0972..b83a46bf89f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java
@@ -41,8 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
 final class EndpointsTest {
-    private ListenerName testListener = ListenerName.normalised("listener");
-    private InetSocketAddress testSocketAddress = 
InetSocketAddress.createUnresolved("localhost", 9092);
+    private final ListenerName testListener = 
ListenerName.normalised("listener");
+    private final InetSocketAddress testSocketAddress = 
InetSocketAddress.createUnresolved("localhost", 9092);
 
     @Test
     void testAddressWithValidEndpoint() {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 9e5d68d5e6a..0443af88845 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -380,7 +380,7 @@ public class KafkaRaftClientReconfigTest {
             apiVersionsResponse(Errors.NONE)
         );
 
-        // Handle the the API_VERSIONS response
+        // Handle the API_VERSIONS response
         context.client.poll();
         // Append new VotersRecord to log
         context.client.poll();
@@ -735,7 +735,7 @@ public class KafkaRaftClientReconfigTest {
             apiVersionsResponse(Errors.NONE)
         );
 
-        // Handle the the API_VERSIONS response
+        // Handle the API_VERSIONS response
         context.client.poll();
 
         // Wait for request timeout without sending a FETCH request to timeout 
the add voter RPC
@@ -1101,7 +1101,7 @@ public class KafkaRaftClientReconfigTest {
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(local.id()));
 
-        // Send a FETCH request for follower2 and increaes the HWM
+        // Send a FETCH request for follower2 and increase the HWM
         context.deliverRequest(
             context.fetchRequest(epoch, follower2, 
context.log.endOffset().offset(), epoch, 0)
         );
@@ -1123,8 +1123,8 @@ public class KafkaRaftClientReconfigTest {
         // Calls to resign should be allowed and not throw an exception
         context.client.resign(epoch);
 
-        // Election timeout is random numer in [electionTimeoutMs, 2 * 
electionTimeoutMs)
-        context.time.sleep(2 * context.electionTimeoutMs());
+        // Election timeout is random number in [electionTimeoutMs, 2 * 
electionTimeoutMs)
+        context.time.sleep(2L * context.electionTimeoutMs());
         context.client.poll();
 
         assertTrue(context.client.quorum().isObserver());
@@ -1578,7 +1578,7 @@ public class KafkaRaftClientReconfigTest {
             )
         );
 
-        // Expect reply for UpdateVoter request without commiting the record
+        // Expect reply for UpdateVoter request without committing the record
         context.pollUntilResponse();
         context.assertSentUpdateVoterResponse(
             Errors.NONE,
@@ -1767,7 +1767,7 @@ public class KafkaRaftClientReconfigTest {
             .withUnknownLeader(3)
             .build();
 
-        // Attempt to uodate voter in the quorum
+        // Attempt to update voter in the quorum
         context.deliverRequest(
             context.updateVoterRequest(
                 follower,
@@ -2248,14 +2248,14 @@ public class KafkaRaftClientReconfigTest {
     @Test
     void testObserverDiscoversLeaderWithUnknownVoters() throws Exception {
         ReplicaKey local = replicaKey(randomReplicaId(), true);
-        InetSocketAddress bootstrapAdddress = 
InetSocketAddress.createUnresolved("localhost", 1234);
+        InetSocketAddress bootstrapAddress = 
InetSocketAddress.createUnresolved("localhost", 1234);
         int epoch = 3;
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
             .withKip853Rpc(true)
             .withBootstrapSnapshot(Optional.empty())
             .withUnknownLeader(epoch)
-            
.withBootstrapServers(Optional.of(Collections.singletonList(bootstrapAdddress)))
+            
.withBootstrapServers(Optional.of(Collections.singletonList(bootstrapAddress)))
             .build();
 
         context.pollUntilRequest();
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 29281fa633f..4695d18f72f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -167,7 +167,7 @@ public class MockLog implements ReplicatedLog {
     }
 
     private void assertValidHighWatermarkMetadata(LogOffsetMetadata 
offsetMetadata) {
-        if (!offsetMetadata.metadata().isPresent()) {
+        if (offsetMetadata.metadata().isEmpty()) {
             return;
         }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java 
b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
index c8e732e8805..47785fab4be 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
@@ -66,7 +66,7 @@ public class MockNetworkChannel implements NetworkChannel {
         Iterator<RaftRequest.Outbound> iterator = sendQueue.iterator();
         while (iterator.hasNext()) {
             RaftRequest.Outbound request = iterator.next();
-            if (!apiKeyFilter.isPresent() || request.data().apiKey() == 
apiKeyFilter.get().id) {
+            if (apiKeyFilter.isEmpty() || request.data().apiKey() == 
apiKeyFilter.get().id) {
                 awaitingResponse.put(request.correlationId(), request);
                 requests.add(request);
                 iterator.remove();
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 91bb6de70b0..8aa599079f7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -990,7 +990,7 @@ public class RaftEventSimulationTest {
                 Integer oldEpoch = nodeEpochs.get(nodeId);
 
                 Optional<ElectionState> electionState = 
state.store.readElectionState();
-                if (!electionState.isPresent()) {
+                if (electionState.isEmpty()) {
                     continue;
                 }
 
@@ -1171,7 +1171,7 @@ public class RaftEventSimulationTest {
             final MockLog log = node.log;
 
             OptionalLong highWatermark = manager.highWatermark();
-            if (!highWatermark.isPresent()) {
+            if (highWatermark.isEmpty()) {
                 // We cannot do validation if the current high watermark is 
unknown
                 return;
             }
diff --git a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java 
b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
index 4b35d9e7551..4bb1b451b3f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -58,7 +58,7 @@ public class ReplicatedCounter implements 
RaftClient.Listener<Integer> {
     }
 
     public synchronized void increment() {
-        if (!claimedEpoch.isPresent()) {
+        if (claimedEpoch.isEmpty()) {
             throw new KafkaException("Counter is not currently writable");
         }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
index ccb28f45477..2c4765804a9 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
@@ -112,7 +112,7 @@ class BatchBuilderTest {
 
         String record = "i am a record";
 
-        while (!builder.bytesNeeded(Collections.singletonList(record), 
null).isPresent()) {
+        while (builder.bytesNeeded(Collections.singletonList(record), 
null).isEmpty()) {
             builder.appendRecord(record, null);
         }
 


Reply via email to