This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c0a092f5626 MINOR: Cleanups in raft module (#17877)
c0a092f5626 is described below
commit c0a092f56262fcc32534396f5d89a6bfa32064e5
Author: Mickael Maison <[email protected]>
AuthorDate: Thu Nov 21 15:19:07 2024 +0100
MINOR: Cleanups in raft module (#17877)
Reviewers: Yash Mayya <[email protected]>
---
.../java/org/apache/kafka/raft/ElectionState.java | 8 +++----
.../java/org/apache/kafka/raft/FollowerState.java | 4 ++--
.../org/apache/kafka/raft/KafkaRaftClient.java | 28 +++++++++++-----------
.../java/org/apache/kafka/raft/LeaderState.java | 7 +++---
.../java/org/apache/kafka/raft/QuorumState.java | 8 +++----
.../org/apache/kafka/raft/UnattachedState.java | 2 +-
.../kafka/raft/internals/AddVoterHandler.java | 10 ++++----
.../kafka/raft/internals/AddVoterHandlerState.java | 2 +-
.../kafka/raft/internals/RecordsBatchReader.java | 2 +-
.../kafka/raft/internals/RecordsIterator.java | 8 +++----
.../kafka/raft/internals/RemoveVoterHandler.java | 6 ++---
.../kafka/raft/internals/UpdateVoterHandler.java | 8 +++----
.../kafka/snapshot/RecordsSnapshotReader.java | 6 ++---
.../kafka/snapshot/RecordsSnapshotWriter.java | 2 +-
.../java/org/apache/kafka/raft/EndpointsTest.java | 4 ++--
.../kafka/raft/KafkaRaftClientReconfigTest.java | 18 +++++++-------
.../test/java/org/apache/kafka/raft/MockLog.java | 2 +-
.../org/apache/kafka/raft/MockNetworkChannel.java | 2 +-
.../apache/kafka/raft/RaftEventSimulationTest.java | 4 ++--
.../org/apache/kafka/raft/ReplicatedCounter.java | 2 +-
.../kafka/raft/internals/BatchBuilderTest.java | 2 +-
21 files changed, 67 insertions(+), 68 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/ElectionState.java
b/raft/src/main/java/org/apache/kafka/raft/ElectionState.java
index e65e72890f5..675436cc52c 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ElectionState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ElectionState.java
@@ -73,11 +73,11 @@ public final class ElectionState {
public boolean isVotedCandidate(ReplicaKey nodeKey) {
if (nodeKey.id() < 0) {
throw new IllegalArgumentException("Invalid node key " + nodeKey);
- } else if (!votedKey.isPresent()) {
+ } else if (votedKey.isEmpty()) {
return false;
} else if (votedKey.get().id() != nodeKey.id()) {
return false;
- } else if (!votedKey.get().directoryId().isPresent()) {
+ } else if (votedKey.get().directoryId().isEmpty()) {
// when the persisted voted directory id is not present assume
that we voted for this candidate;
// this happens when the kraft version is 0.
return true;
@@ -87,7 +87,7 @@ public final class ElectionState {
}
public int leaderId() {
- if (!leaderId.isPresent())
+ if (leaderId.isEmpty())
throw new IllegalStateException("Attempt to access nil leaderId");
return leaderId.getAsInt();
}
@@ -101,7 +101,7 @@ public final class ElectionState {
}
public ReplicaKey votedKey() {
- if (!votedKey.isPresent()) {
+ if (votedKey.isEmpty()) {
throw new IllegalStateException("Attempt to access nil votedId");
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
index 49eecab5d61..67748f54adc 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FollowerState.java
@@ -131,7 +131,7 @@ public class FollowerState implements EpochState {
private long updateVoterPeriodMs() {
// Allow for a few rounds of fetch request before attempting to update
// the voter state
- return fetchTimeoutMs * 3;
+ return fetchTimeoutMs * 3L;
}
public boolean hasUpdateVoterPeriodExpired(long currentTimeMs) {
@@ -150,7 +150,7 @@ public class FollowerState implements EpochState {
}
public boolean updateHighWatermark(OptionalLong newHighWatermark) {
- if (!newHighWatermark.isPresent() && highWatermark.isPresent()) {
+ if (newHighWatermark.isEmpty() && highWatermark.isPresent()) {
throw new IllegalArgumentException(
String.format("Attempt to overwrite current high watermark %s
with unknown value", highWatermark)
);
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 51aa5e59f2f..cf4612355f4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1210,7 +1210,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
int position = 0;
for (ReplicaKey candidate : preferredCandidates) {
if (candidate.id() == quorum.localIdOrThrow()) {
- if (!candidate.directoryId().isPresent() ||
+ if (candidate.directoryId().isEmpty() ||
candidate.directoryId().get().equals(quorum.localDirectoryId())
) {
// Found ourselves in the preferred candidate list
@@ -1788,7 +1788,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
Optional<FetchSnapshotRequestData.PartitionSnapshot>
partitionSnapshotOpt = FetchSnapshotRequest
.forTopicPartition(data, log.topicPartition());
- if (!partitionSnapshotOpt.isPresent()) {
+ if (partitionSnapshotOpt.isEmpty()) {
// The Raft client assumes that there is only one topic partition.
TopicPartition unknownTopicPartition = new TopicPartition(
data.topics().get(0).name(),
@@ -1828,7 +1828,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
);
Optional<RawSnapshotReader> snapshotOpt = log.readSnapshot(snapshotId);
- if (!snapshotOpt.isPresent() ||
snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID)) {
+ if (snapshotOpt.isEmpty() || snapshotId.equals(BOOTSTRAP_SNAPSHOT_ID))
{
// The bootstrap checkpoint should not be replicated. The first
leader will
// make sure that the content of the bootstrap checkpoint is
included in the
// partition log
@@ -1944,7 +1944,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
Optional<FetchSnapshotResponseData.PartitionSnapshot>
partitionSnapshotOpt = FetchSnapshotResponse
.forTopicPartition(data, log.topicPartition());
- if (!partitionSnapshotOpt.isPresent()) {
+ if (partitionSnapshotOpt.isEmpty()) {
return false;
}
@@ -2098,7 +2098,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
Optional<ReplicaKey> newVoter = RaftUtil.addVoterRequestVoterKey(data);
- if (!newVoter.isPresent() ||
!newVoter.get().directoryId().isPresent()) {
+ if (newVoter.isEmpty() || newVoter.get().directoryId().isEmpty()) {
return completedFuture(
new AddRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
@@ -2107,7 +2107,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
Endpoints newVoterEndpoints =
Endpoints.fromAddVoterRequest(data.listeners());
- if (!newVoterEndpoints.address(channel.listenerName()).isPresent()) {
+ if (newVoterEndpoints.address(channel.listenerName()).isEmpty()) {
return completedFuture(
new AddRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
@@ -2181,7 +2181,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
Optional<ReplicaKey> oldVoter =
RaftUtil.removeVoterRequestVoterKey(data);
- if (!oldVoter.isPresent() ||
!oldVoter.get().directoryId().isPresent()) {
+ if (oldVoter.isEmpty() || oldVoter.get().directoryId().isEmpty()) {
return completedFuture(
new RemoveRaftVoterResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
@@ -2226,7 +2226,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
Optional<ReplicaKey> voter = RaftUtil.updateVoterRequestVoterKey(data);
- if (!voter.isPresent() || !voter.get().directoryId().isPresent()) {
+ if (voter.isEmpty() || voter.get().directoryId().isEmpty()) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
@@ -2238,7 +2238,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
Endpoints voterEndpoints =
Endpoints.fromUpdateVoterRequest(data.listeners());
- if (!voterEndpoints.address(channel.listenerName()).isPresent()) {
+ if (voterEndpoints.address(channel.listenerName()).isEmpty()) {
return completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
@@ -2319,8 +2319,8 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
return quorum.isLeader();
} else {
return epoch != quorum.epoch()
- || !leaderId.isPresent()
- || !quorum.leaderId().isPresent()
+ || leaderId.isEmpty()
+ || quorum.leaderId().isEmpty()
|| leaderId.equals(quorum.leaderId());
}
}
@@ -2516,7 +2516,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
return voterKey
.map(key -> {
if (!OptionalInt.of(key.id()).equals(nodeId)) return false;
- if (!key.directoryId().isPresent()) return true;
+ if (key.directoryId().isEmpty()) return true;
return key.directoryId().get().equals(nodeDirectoryId);
})
@@ -3399,7 +3399,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
// Note that if we transition to another state before we have a
chance to
// request resignation, then we consider the call fulfilled.
Optional<LeaderState<Object>> leaderStateOpt =
quorum.maybeLeaderState();
- if (!leaderStateOpt.isPresent()) {
+ if (leaderStateOpt.isEmpty()) {
logger.debug("Ignoring call to resign from epoch {} since this
node is " +
"no longer the leader", epoch);
return;
@@ -3702,7 +3702,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
return true;
} else {
return leaderAndEpoch.leaderId().isPresent() &&
- !lastFiredLeaderChange.leaderId().isPresent();
+ lastFiredLeaderChange.leaderId().isEmpty();
}
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index c09282c87c9..0024c849b21 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -35,7 +35,6 @@ import org.apache.kafka.server.common.KRaftVersion;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -112,7 +111,7 @@ public class LeaderState<T> implements EpochState {
new ReplicaState(voterNode.voterKey(), hasAcknowledgedLeader,
voterNode.listeners())
);
}
- this.grantingVoters = Collections.unmodifiableSet(new
HashSet<>(grantingVoters));
+ this.grantingVoters = Set.copyOf(grantingVoters);
this.log = logContext.logger(LeaderState.class);
this.accumulator = Objects.requireNonNull(accumulator, "accumulator
must be non-null");
// use the 1.5x of fetch timeout to tolerate some network transition
time or other IO time.
@@ -809,9 +808,9 @@ public class LeaderState<T> implements EpochState {
public int compareTo(ReplicaState that) {
if (this.endOffset.equals(that.endOffset))
return this.replicaKey.compareTo(that.replicaKey);
- else if (!this.endOffset.isPresent())
+ else if (this.endOffset.isEmpty())
return 1;
- else if (!that.endOffset.isPresent())
+ else if (that.endOffset.isEmpty())
return -1;
else
return Long.compare(that.endOffset.get().offset(),
this.endOffset.get().offset());
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index 0598ce062df..de82977583a 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -140,7 +140,7 @@ public class QuorumState {
ElectionState election = readElectionState();
final EpochState initialState;
- if (election.hasVoted() && !localId.isPresent()) {
+ if (election.hasVoted() && localId.isEmpty()) {
throw new IllegalStateException(
String.format(
"Initialized quorum state (%s) with a voted candidate but
without a local id",
@@ -332,7 +332,7 @@ public class QuorumState {
}
public boolean isVoter() {
- if (!localId.isPresent()) {
+ if (localId.isEmpty()) {
return false;
}
@@ -425,7 +425,7 @@ public class QuorumState {
epoch
)
);
- } else if (!localId.isPresent()) {
+ } else if (localId.isEmpty()) {
throw new IllegalStateException("Cannot transition to voted
without a replica id");
} else if (epoch < currentEpoch) {
throw new IllegalStateException(
@@ -707,7 +707,7 @@ public class QuorumState {
}
public boolean isUnattachedNotVoted() {
- return maybeUnattachedState().filter(unattached ->
!unattached.votedKey().isPresent()).isPresent();
+ return maybeUnattachedState().filter(unattached ->
unattached.votedKey().isEmpty()).isPresent();
}
public boolean isUnattachedAndVoted() {
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index 4b21849f818..f41c6b8d563 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -123,7 +123,7 @@ public class UnattachedState implements EpochState {
if (votedKey.isPresent()) {
ReplicaKey votedReplicaKey = votedKey.get();
if (votedReplicaKey.id() == candidateKey.id()) {
- return !votedReplicaKey.directoryId().isPresent() ||
votedReplicaKey.directoryId().equals(candidateKey.directoryId());
+ return votedReplicaKey.directoryId().isEmpty() ||
votedReplicaKey.directoryId().equals(candidateKey.directoryId());
}
log.debug(
"Rejecting vote request from candidate ({}), already have
voted for another " +
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
index 44b9eb2a39d..1f7ea2f61c4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java
@@ -101,7 +101,7 @@ public final class AddVoterHandler {
// Check that the leader has established a HWM and committed the
current epoch
Optional<Long> highWatermark =
leaderState.highWatermark().map(LogOffsetMetadata::offset);
- if (!highWatermark.isPresent()) {
+ if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
@@ -127,7 +127,7 @@ public final class AddVoterHandler {
// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry =
partitionState.lastVoterSetEntry();
- if (!votersEntry.isPresent() || votersEntry.get().offset() >=
highWatermark.get()) {
+ if (votersEntry.isEmpty() || votersEntry.get().offset() >=
highWatermark.get()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
@@ -172,7 +172,7 @@ public final class AddVoterHandler {
this::buildApiVersionsRequest,
currentTimeMs
);
- if (!timeout.isPresent()) {
+ if (timeout.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.addVoterResponse(
Errors.REQUEST_TIMED_OUT,
@@ -203,7 +203,7 @@ public final class AddVoterHandler {
long currentTimeMs
) {
Optional<AddVoterHandlerState> handlerState =
leaderState.addVoterHandlerState();
- if (!handlerState.isPresent()) {
+ if (handlerState.isEmpty()) {
// There are no pending add operation just ignore the api response
return true;
}
@@ -242,7 +242,7 @@ public final class AddVoterHandler {
return false;
}
- // Check that the new voter supports the kraft.verion for
reconfiguration
+ // Check that the new voter supports the kraft.version for
reconfiguration
KRaftVersion kraftVersion = partitionState.lastKraftVersion();
if (!validVersionRange(kraftVersion, supportedKraftVersions)) {
logger.info(
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
index c403d0e0cd2..b43197c273d 100644
---
a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
+++
b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java
@@ -49,7 +49,7 @@ public final class AddVoterHandlerState {
}
public boolean expectingApiResponse(int replicaId) {
- return !lastOffset.isPresent() && replicaId == voterKey.id();
+ return lastOffset.isEmpty() && replicaId == voterKey.id();
}
public void setLastOffset(long lastOffset) {
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
index a9f4e106aa1..64572b6bc49 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
@@ -51,7 +51,7 @@ public final class RecordsBatchReader<T> implements
BatchReader<T> {
public boolean hasNext() {
ensureOpen();
- if (!nextBatch.isPresent()) {
+ if (nextBatch.isEmpty()) {
nextBatch = nextBatch();
}
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index 82991389748..2ea3126de77 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -86,7 +86,7 @@ public final class RecordsIterator<T> implements
Iterator<Batch<T>>, AutoCloseab
public boolean hasNext() {
ensureOpen();
- if (!nextBatch.isPresent()) {
+ if (nextBatch.isEmpty()) {
nextBatch = nextBatch();
}
@@ -334,7 +334,7 @@ public final class RecordsIterator<T> implements
Iterator<Batch<T>>, AutoCloseab
throw new IllegalArgumentException("Got key in the record when no
key was expected");
}
- if (!value.isPresent()) {
+ if (value.isEmpty()) {
throw new IllegalArgumentException("Missing value in the record
when a value was expected");
} else if (value.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty value
in the record");
@@ -346,13 +346,13 @@ public final class RecordsIterator<T> implements
Iterator<Batch<T>>, AutoCloseab
}
private static ControlRecord decodeControlRecord(Optional<ByteBuffer> key,
Optional<ByteBuffer> value) {
- if (!key.isPresent()) {
+ if (key.isEmpty()) {
throw new IllegalArgumentException("Missing key in the record when
a key was expected");
} else if (key.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty key in
the record");
}
- if (!value.isPresent()) {
+ if (value.isEmpty()) {
throw new IllegalArgumentException("Missing value in the record
when a value was expected");
} else if (value.get().remaining() == 0) {
throw new IllegalArgumentException("Got an unexpected empty value
in the record");
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
index 29093cc30b6..2dea86d593b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java
@@ -91,7 +91,7 @@ public final class RemoveVoterHandler {
// Check that the leader has established a HWM and committed the
current epoch
Optional<Long> highWatermark =
leaderState.highWatermark().map(LogOffsetMetadata::offset);
- if (!highWatermark.isPresent()) {
+ if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.REQUEST_TIMED_OUT,
@@ -117,7 +117,7 @@ public final class RemoveVoterHandler {
// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry =
partitionState.lastVoterSetEntry();
- if (!votersEntry.isPresent() || votersEntry.get().offset() >=
highWatermark.get()) {
+ if (votersEntry.isEmpty() || votersEntry.get().offset() >=
highWatermark.get()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.REQUEST_TIMED_OUT,
@@ -132,7 +132,7 @@ public final class RemoveVoterHandler {
// Remove the voter from the set of voters
Optional<VoterSet> newVoters =
votersEntry.get().value().removeVoter(voterKey);
- if (!newVoters.isPresent()) {
+ if (newVoters.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.removeVoterResponse(
Errors.VOTER_NOT_FOUND,
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
index 417c1decad7..335e1b02a22 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/UpdateVoterHandler.java
@@ -97,7 +97,7 @@ public final class UpdateVoterHandler {
// Check that the leader has established a HWM and committed the
current epoch
Optional<Long> highWatermark =
leaderState.highWatermark().map(LogOffsetMetadata::offset);
- if (!highWatermark.isPresent()) {
+ if (highWatermark.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
@@ -130,7 +130,7 @@ public final class UpdateVoterHandler {
// Check that there are no uncommitted VotersRecord
Optional<LogHistory.Entry<VoterSet>> votersEntry =
partitionState.lastVoterSetEntry();
- if (!votersEntry.isPresent() || votersEntry.get().offset() >=
highWatermark.get()) {
+ if (votersEntry.isEmpty() || votersEntry.get().offset() >=
highWatermark.get()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.REQUEST_TIMED_OUT,
@@ -160,7 +160,7 @@ public final class UpdateVoterHandler {
}
// Check that endpoinds includes the default listener
- if (!voterEndpoints.address(defaultListenerName).isPresent()) {
+ if (voterEndpoints.address(defaultListenerName).isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.INVALID_REQUEST,
@@ -188,7 +188,7 @@ public final class UpdateVoterHandler {
)
)
);
- if (!updatedVoters.isPresent()) {
+ if (updatedVoters.isEmpty()) {
return CompletableFuture.completedFuture(
RaftUtil.updateVoterResponse(
Errors.VOTER_NOT_FOUND,
diff --git
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
index c865dc2a1a4..fc815621d83 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
@@ -61,7 +61,7 @@ public final class RecordsSnapshotReader<T> implements
SnapshotReader<T> {
@Override
public long lastContainedLogTimestamp() {
- if (!lastContainedLogTimestamp.isPresent()) {
+ if (lastContainedLogTimestamp.isEmpty()) {
nextBatch.ifPresent(batch -> {
throw new IllegalStateException(
String.format(
@@ -83,7 +83,7 @@ public final class RecordsSnapshotReader<T> implements
SnapshotReader<T> {
@Override
public boolean hasNext() {
- if (!nextBatch.isPresent()) {
+ if (nextBatch.isEmpty()) {
nextBatch = nextBatch();
}
@@ -127,7 +127,7 @@ public final class RecordsSnapshotReader<T> implements
SnapshotReader<T> {
if (iterator.hasNext()) {
Batch<T> batch = iterator.next();
- if (!lastContainedLogTimestamp.isPresent()) {
+ if (lastContainedLogTimestamp.isEmpty()) {
// This must be the first batch which is expected to be a
control batch with at least one record for
// the snapshot header.
if (batch.controlRecords().isEmpty()) {
diff --git
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
index ef26ce3bc05..47683d68bdb 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
@@ -192,7 +192,7 @@ public final class RecordsSnapshotWriter<T> implements
SnapshotWriter<T> {
}
public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
- if (!rawSnapshotWriter.isPresent()) {
+ if (rawSnapshotWriter.isEmpty()) {
throw new IllegalStateException("Builder::build called without
a RawSnapshotWriter");
} else if (rawSnapshotWriter.get().sizeInBytes() != 0) {
throw new IllegalStateException(
diff --git a/raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java
b/raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java
index a4a39de0972..b83a46bf89f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/EndpointsTest.java
@@ -41,8 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
final class EndpointsTest {
- private ListenerName testListener = ListenerName.normalised("listener");
- private InetSocketAddress testSocketAddress =
InetSocketAddress.createUnresolved("localhost", 9092);
+ private final ListenerName testListener =
ListenerName.normalised("listener");
+ private final InetSocketAddress testSocketAddress =
InetSocketAddress.createUnresolved("localhost", 9092);
@Test
void testAddressWithValidEndpoint() {
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 9e5d68d5e6a..0443af88845 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -380,7 +380,7 @@ public class KafkaRaftClientReconfigTest {
apiVersionsResponse(Errors.NONE)
);
- // Handle the the API_VERSIONS response
+ // Handle the API_VERSIONS response
context.client.poll();
// Append new VotersRecord to log
context.client.poll();
@@ -735,7 +735,7 @@ public class KafkaRaftClientReconfigTest {
apiVersionsResponse(Errors.NONE)
);
- // Handle the the API_VERSIONS response
+ // Handle the API_VERSIONS response
context.client.poll();
// Wait for request timeout without sending a FETCH request to timeout
the add voter RPC
@@ -1101,7 +1101,7 @@ public class KafkaRaftClientReconfigTest {
context.pollUntilResponse();
context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(local.id()));
- // Send a FETCH request for follower2 and increaes the HWM
+ // Send a FETCH request for follower2 and increase the HWM
context.deliverRequest(
context.fetchRequest(epoch, follower2,
context.log.endOffset().offset(), epoch, 0)
);
@@ -1123,8 +1123,8 @@ public class KafkaRaftClientReconfigTest {
// Calls to resign should be allowed and not throw an exception
context.client.resign(epoch);
- // Election timeout is random numer in [electionTimeoutMs, 2 *
electionTimeoutMs)
- context.time.sleep(2 * context.electionTimeoutMs());
+ // Election timeout is random number in [electionTimeoutMs, 2 *
electionTimeoutMs)
+ context.time.sleep(2L * context.electionTimeoutMs());
context.client.poll();
assertTrue(context.client.quorum().isObserver());
@@ -1578,7 +1578,7 @@ public class KafkaRaftClientReconfigTest {
)
);
- // Expect reply for UpdateVoter request without commiting the record
+ // Expect reply for UpdateVoter request without committing the record
context.pollUntilResponse();
context.assertSentUpdateVoterResponse(
Errors.NONE,
@@ -1767,7 +1767,7 @@ public class KafkaRaftClientReconfigTest {
.withUnknownLeader(3)
.build();
- // Attempt to uodate voter in the quorum
+ // Attempt to update voter in the quorum
context.deliverRequest(
context.updateVoterRequest(
follower,
@@ -2248,14 +2248,14 @@ public class KafkaRaftClientReconfigTest {
@Test
void testObserverDiscoversLeaderWithUnknownVoters() throws Exception {
ReplicaKey local = replicaKey(randomReplicaId(), true);
- InetSocketAddress bootstrapAdddress =
InetSocketAddress.createUnresolved("localhost", 1234);
+ InetSocketAddress bootstrapAddress =
InetSocketAddress.createUnresolved("localhost", 1234);
int epoch = 3;
RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
.withKip853Rpc(true)
.withBootstrapSnapshot(Optional.empty())
.withUnknownLeader(epoch)
-
.withBootstrapServers(Optional.of(Collections.singletonList(bootstrapAdddress)))
+
.withBootstrapServers(Optional.of(Collections.singletonList(bootstrapAddress)))
.build();
context.pollUntilRequest();
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 29281fa633f..4695d18f72f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -167,7 +167,7 @@ public class MockLog implements ReplicatedLog {
}
private void assertValidHighWatermarkMetadata(LogOffsetMetadata
offsetMetadata) {
- if (!offsetMetadata.metadata().isPresent()) {
+ if (offsetMetadata.metadata().isEmpty()) {
return;
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
index c8e732e8805..47785fab4be 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
@@ -66,7 +66,7 @@ public class MockNetworkChannel implements NetworkChannel {
Iterator<RaftRequest.Outbound> iterator = sendQueue.iterator();
while (iterator.hasNext()) {
RaftRequest.Outbound request = iterator.next();
- if (!apiKeyFilter.isPresent() || request.data().apiKey() ==
apiKeyFilter.get().id) {
+ if (apiKeyFilter.isEmpty() || request.data().apiKey() ==
apiKeyFilter.get().id) {
awaitingResponse.put(request.correlationId(), request);
requests.add(request);
iterator.remove();
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 91bb6de70b0..8aa599079f7 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -990,7 +990,7 @@ public class RaftEventSimulationTest {
Integer oldEpoch = nodeEpochs.get(nodeId);
Optional<ElectionState> electionState =
state.store.readElectionState();
- if (!electionState.isPresent()) {
+ if (electionState.isEmpty()) {
continue;
}
@@ -1171,7 +1171,7 @@ public class RaftEventSimulationTest {
final MockLog log = node.log;
OptionalLong highWatermark = manager.highWatermark();
- if (!highWatermark.isPresent()) {
+ if (highWatermark.isEmpty()) {
// We cannot do validation if the current high watermark is
unknown
return;
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
index 4b35d9e7551..4bb1b451b3f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -58,7 +58,7 @@ public class ReplicatedCounter implements
RaftClient.Listener<Integer> {
}
public synchronized void increment() {
- if (!claimedEpoch.isPresent()) {
+ if (claimedEpoch.isEmpty()) {
throw new KafkaException("Counter is not currently writable");
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
index ccb28f45477..2c4765804a9 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
@@ -112,7 +112,7 @@ class BatchBuilderTest {
String record = "i am a record";
- while (!builder.bytesNeeded(Collections.singletonList(record),
null).isPresent()) {
+ while (builder.bytesNeeded(Collections.singletonList(record),
null).isEmpty()) {
builder.appendRecord(record, null);
}