This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7e3dde99d7f MINOR: Various cleanups in raft (#16611)
7e3dde99d7f is described below
commit 7e3dde99d7fac8b1eb14dacf4dd8f7137d78bf80
Author: Mickael Maison <[email protected]>
AuthorDate: Thu Jul 18 12:48:29 2024 +0200
MINOR: Various cleanups in raft (#16611)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 26 +++++++++++-----------
.../java/org/apache/kafka/raft/LeaderState.java | 2 +-
.../main/java/org/apache/kafka/raft/RaftUtil.java | 4 ++--
.../java/org/apache/kafka/raft/RequestManager.java | 10 ++++-----
.../org/apache/kafka/raft/UnattachedState.java | 4 ++--
.../org/apache/kafka/raft/internals/VoterSet.java | 5 +----
.../apache/kafka/raft/KafkaNetworkChannelTest.java | 6 ++---
.../kafka/raft/KafkaRaftClientReconfigTest.java | 4 ++--
.../org/apache/kafka/raft/LeaderStateTest.java | 2 +-
.../test/java/org/apache/kafka/raft/MockLog.java | 16 ++++++-------
.../java/org/apache/kafka/raft/MockLogTest.java | 16 +++++--------
.../apache/kafka/raft/RaftClientTestContext.java | 8 +++----
.../apache/kafka/raft/RaftEventSimulationTest.java | 12 +++++-----
.../org/apache/kafka/raft/UnattachedStateTest.java | 17 ++++----------
.../kafka/raft/internals/BatchAccumulatorTest.java | 14 ++++++------
.../kafka/raft/internals/BatchBuilderTest.java | 3 ++-
.../kafka/raft/internals/RecordsIteratorTest.java | 4 ++--
.../snapshot/NotifyingRawSnapshotWriterTest.java | 2 +-
.../kafka/snapshot/RecordsSnapshotWriterTest.java | 8 +++----
.../kafka/snapshot/SnapshotWriterReaderTest.java | 12 +++++-----
20 files changed, 78 insertions(+), 97 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 17bf64f4666..5086532bb7e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1206,7 +1206,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
.setErrorCode(error.code())
.setLogStartOffset(log.startOffset())
.setHighWatermark(
- highWatermark.map(offsetMetadata ->
offsetMetadata.offset()).orElse(-1L)
+
highWatermark.map(LogOffsetMetadata::offset).orElse(-1L)
);
partitionData.currentLeader()
@@ -1991,7 +1991,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
* @param leaderId Optional leaderId from the response
* @param epoch Epoch received from the response
* @param leaderEndpoints the endpoints of the leader from the response
- * @param souce the node the sent the response
+ * @param source the node that sent the response
* @param currentTimeMs Current epoch time in milliseconds
* @return Optional value indicating whether the error was handled here
and the outcome of
* that handling. Specifically:
@@ -2012,8 +2012,8 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
long currentTimeMs
) {
if (leaderEndpoints.isEmpty() && leaderId.isPresent()) {
- // The response didn't include the leader enpoints because it is
from a replica
- // that doesn't support reconfiguration. Look up the the leader
endpoint in the
+ // The response didn't include the leader endpoints because it is
from a replica
+ // that doesn't support reconfiguration. Look up the leader
endpoint in the
// voter set.
leaderEndpoints = partitionState
.lastVoterSet()
@@ -2334,7 +2334,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
return minBackoffMs;
}
- private long maybeSendRequets(
+ private long maybeSendRequest(
long currentTimeMs,
Set<ReplicaKey> remoteVoters,
Function<Integer, Node> destinationSupplier,
@@ -2459,9 +2459,9 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
double elapsedTimePerRecord = (double) elapsedTime /
batch.numRecords;
kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord,
appendTimeMs);
logger.debug("Completed commit of {} records up to last
offset {}", batch.numRecords, offsetAndEpoch);
- batch.records.ifPresent(records -> {
- maybeFireHandleCommit(batch.baseOffset, epoch,
batch.appendTimestamp(), batch.sizeInBytes(), records);
- });
+ batch.records.ifPresent(records ->
+ maybeFireHandleCommit(batch.baseOffset, epoch,
batch.appendTimestamp(), batch.sizeInBytes(), records)
+ );
}
});
} finally {
@@ -2501,7 +2501,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
long timeUntilNextBeginQuorumSend =
state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);
if (timeUntilNextBeginQuorumSend == 0) {
VoterSet voters = partitionState.lastVoterSet();
- timeUntilNextBeginQuorumSend = maybeSendRequets(
+ timeUntilNextBeginQuorumSend = maybeSendRequest(
currentTimeMs,
voters
.voterKeys()
@@ -2587,7 +2587,7 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
// Continue sending Vote requests as long as we still have a chance to
win the election
if (!state.isVoteRejected()) {
VoterSet voters = partitionState.lastVoterSet();
- return maybeSendRequets(
+ return maybeSendRequest(
currentTimeMs,
state.unrecordedVoters(),
voterId -> voters
@@ -2783,9 +2783,9 @@ public final class KafkaRaftClient<T> implements
RaftClient<T> {
}
// Check listener progress to see if reads are expected
- quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
- updateListenersProgress(highWatermarkMetadata.offset());
- });
+ quorum.highWatermark().ifPresent(highWatermarkMetadata ->
+ updateListenersProgress(highWatermarkMetadata.offset())
+ );
// Notify the new listeners of the latest leader and epoch
Optional<LeaderState<T>> leaderState = quorum.maybeLeaderState();
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 9cdfbbf3d84..2a91b3591bb 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -239,7 +239,7 @@ public class LeaderState<T> implements EpochState {
offsetOfVotersAtEpochStart.ifPresent(offset -> {
if (offset == -1) {
// Latest voter set came from the bootstrap checkpoint
(0-0.checkpoint)
- // rewrite the voter set to the log so that it is
replcated to the replicas.
+ // rewrite the voter set to the log so that it is
replicated to the replicas.
if (!kraftVersionAtEpochStart.isReconfigSupported()) {
throw new IllegalStateException(
String.format(
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index c28864b2f2b..816bd7a95ae 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -301,7 +301,7 @@ public class RaftUtil {
String clusterId,
int leaderEpoch,
int leaderId,
- Endpoints leaderEndponts,
+ Endpoints leaderEndpoints,
ReplicaKey voterKey
) {
return new BeginQuorumEpochRequestData()
@@ -322,7 +322,7 @@ public class RaftUtil {
)
)
)
- .setLeaderEndpoints(leaderEndponts.toBeginQuorumEpochRequest());
+ .setLeaderEndpoints(leaderEndpoints.toBeginQuorumEpochRequest());
}
public static BeginQuorumEpochResponseData
singletonBeginQuorumEpochResponse(
diff --git a/raft/src/main/java/org/apache/kafka/raft/RequestManager.java
b/raft/src/main/java/org/apache/kafka/raft/RequestManager.java
index 72eab680777..809cf7a50ae 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RequestManager.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RequestManager.java
@@ -38,7 +38,7 @@ import java.util.Random;
* When a request times out or completes successfully the collection will
transition back to the
* ready state.
*
- * When a request completes with an error it still transition to the backoff
state until
+ * When a request completes with an error it still transitions to the backoff
state until
* {@code retryBackoffMs}.
*/
public class RequestManager {
@@ -62,7 +62,7 @@ public class RequestManager {
}
/**
- * Returns true if there any connection with pending requests.
+ * Returns true if there are any connections with pending requests.
*
* This is useful for satisfying the invariant that there is only one
pending Fetch request.
* If there are more than one pending fetch request, it is possible for
the follower to write
@@ -105,7 +105,7 @@ public class RequestManager {
* @return a random ready bootstrap node
*/
public Optional<Node> findReadyBootstrapServer(long currentTimeMs) {
- // Check that there are no infilght requests accross any of the known
nodes not just
+ // Check that there are no inflight requests across any of the known
nodes not just
// the bootstrap servers
if (hasAnyInflightRequest(currentTimeMs)) {
return Optional.empty();
@@ -133,7 +133,7 @@ public class RequestManager {
* If there is a connection with a pending request it returns the amount
of time to wait until
* the request times out.
*
- * Returns zero, if there are no pending requests and at least one of the
boorstrap servers is
+ * Returns zero, if there are no pending requests and at least one of the
bootstrap servers is
* ready.
*
* If all of the bootstrap servers are backing off and there are no
pending requests, return
@@ -243,7 +243,7 @@ public class RequestManager {
public void onResponseResult(Node node, long correlationId, boolean
success, long timeMs) {
if (isResponseExpected(node, correlationId)) {
if (success) {
- // Mark the connection as ready by reseting it
+ // Mark the connection as ready by resetting it
reset(node);
} else {
// Backoff the connection
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index 3c4680a8526..1dba6a70f28 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -28,14 +28,14 @@ import java.util.OptionalInt;
import java.util.Set;
/**
- * A replica is "unattached" when it doesn't known the leader or the leader's
endpoint.
+ * A replica is "unattached" when it doesn't know the leader or the leader's
endpoint.
*
* Typically, a replica doesn't know the leader if the KRaft topic is
undergoing an election cycle.
*
* It is also possible for a replica to be unattached if it doesn't know the
leader's endpoint.
* This typically happens when a replica starts up and the known leader id is
not part of the local
* voter set. In that case, during startup the replica transitions to
unattached instead of
- * transitioning to follower. The unattched replica discovers the leader and
leader's endpoint
+ * transitioning to follower. The unattached replica discovers the leader and
leader's endpoint
* either through random Fetch requests to the bootstrap servers or through
BeginQuorumEpoch
* request from the leader.
*/
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index 2f961897982..cca955d41e3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -145,10 +145,7 @@ public final class VoterSet {
* Returns all of the voters.
*/
public Set<VoterNode> voterNodes() {
- return voters
- .values()
- .stream()
- .collect(Collectors.toSet());
+ return new HashSet<>(voters.values());
}
/**
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index 00000ec7d81..579508a2569 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -285,12 +285,12 @@ public class KafkaNetworkChannelTest {
return VoteRequest.singletonRequest(topicPartition, clusterId,
leaderEpoch, leaderId, lastEpoch, 329);
case FETCH:
- FetchRequestData request =
RaftUtil.singletonFetchRequest(topicPartition, topicId, fetchPartition -> {
+ FetchRequestData request =
RaftUtil.singletonFetchRequest(topicPartition, topicId, fetchPartition ->
fetchPartition
.setCurrentLeaderEpoch(5)
.setFetchOffset(333)
- .setLastFetchedEpoch(5);
- });
+ .setLastFetchedEpoch(5)
+ );
request.setReplicaState(new
FetchRequestData.ReplicaState().setReplicaId(1));
return request;
diff --git
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 84a9af918b6..678780db441 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -302,13 +302,13 @@ public class KafkaRaftClientReconfigTest {
new LeaderChangeMessage()
);
builder.appendKRaftVersionMessage(
- 0, // timesteamp
+ 0, // timestamp
new KRaftVersionRecord()
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
.setKRaftVersion((short) 1)
);
builder.appendVotersMessage(
- 0, // timesteamp
+ 0, // timestamp
leadersVoterSet.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION)
);
MemoryRecords leaderRecords = builder.build();
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index a423d416c8b..dfb5469434d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -539,7 +539,7 @@ public class LeaderStateTest {
assertFalse(state.updateReplicaState(nodeKey2, 0, new
LogOffsetMetadata(15L)));
assertEquals(Optional.of(new LogOffsetMetadata(15L)),
state.highWatermark());
- // HW will not update to 16L until majority of remaining voterSet
(node1, node2) are at least 16L
+ // HW will not update to 16L until the majority of remaining voterSet
(node1, node2) are at least 16L
assertTrue(state.updateReplicaState(nodeKey2, 0, new
LogOffsetMetadata(16L)));
assertEquals(Optional.of(new LogOffsetMetadata(16L)),
state.highWatermark());
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index b8c2089caf2..29281fa633f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -499,9 +499,9 @@ public class MockLog implements ReplicatedLog {
return Optional.empty();
} else {
return Optional.of(
- new MockRawSnapshotWriter(snapshotId, buffer -> {
- snapshots.putIfAbsent(snapshotId, new
MockRawSnapshotReader(snapshotId, buffer));
- })
+ new MockRawSnapshotWriter(snapshotId, buffer ->
+ snapshots.putIfAbsent(snapshotId, new
MockRawSnapshotReader(snapshotId, buffer))
+ )
);
}
}
@@ -569,12 +569,10 @@ public class MockLog implements ReplicatedLog {
return false;
});
- last.get().ifPresent(epochStartOffset -> {
- epochStartOffsets.add(
- 0,
- new EpochStartOffset(epochStartOffset.epoch,
snapshotId.offset())
- );
- });
+ last.get().ifPresent(epochStartOffset ->
+ epochStartOffsets.add(0, new
EpochStartOffset(epochStartOffset.epoch, snapshotId.offset())
+ )
+ );
updated = true;
}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 0c389a41522..08e19866d9b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -168,21 +168,15 @@ public class MockLogTest {
// Throw exception for out of order records
assertThrows(
RuntimeException.class,
- () -> {
- log.appendAsLeader(
+ () -> log.appendAsLeader(
MemoryRecords.withRecords(initialOffset, Compression.NONE,
currentEpoch, recordFoo),
- currentEpoch
- );
- }
+ currentEpoch)
);
assertThrows(
RuntimeException.class,
- () -> {
- log.appendAsFollower(
- MemoryRecords.withRecords(initialOffset, Compression.NONE,
currentEpoch, recordFoo)
- );
- }
+ () -> log.appendAsFollower(
+ MemoryRecords.withRecords(initialOffset, Compression.NONE,
currentEpoch, recordFoo))
);
}
@@ -513,7 +507,7 @@ public class MockLogTest {
}
@Test
- public void testCreateSnapshotMuchEalierEpoch() {
+ public void testCreateSnapshotMuchEarlierEpoch() {
int numberOfRecords = 10;
int epoch = 2;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch);
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 60147c07888..4b80eea2534 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1097,9 +1097,9 @@ public final class RaftClientTestContext {
assertEquals(epoch, partitionRequest.leaderEpoch());
assertEquals(localIdOrThrow(), partitionRequest.leaderId());
- preferredSuccessorsOpt.ifPresent(preferredSuccessors -> {
- assertEquals(preferredSuccessors,
partitionRequest.preferredSuccessors());
- });
+ preferredSuccessorsOpt.ifPresent(preferredSuccessors ->
+ assertEquals(preferredSuccessors,
partitionRequest.preferredSuccessors())
+ );
collectedDestinationIdSet.add(raftMessage.destination().id());
endQuorumRequests.add(raftMessage);
@@ -1709,7 +1709,7 @@ public final class RaftClientTestContext {
return commits.stream()
.filter(batch -> batch.lastOffset() == lastOffset)
.findFirst()
- .map(batch -> batch.records())
+ .map(Batch::records)
.orElse(null);
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index c57c441b98b..5954379cf96 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -996,9 +996,9 @@ public class RaftEventSimulationTest {
fail("Non-monotonic update of epoch detected on node " +
nodeId + ": " +
oldEpoch + " -> " + newEpoch);
}
- cluster.ifRunning(nodeId, nodeState -> {
- assertEquals(newEpoch, nodeState.client.quorum().epoch());
- });
+ cluster.ifRunning(nodeId, nodeState ->
+ assertEquals(newEpoch, nodeState.client.quorum().epoch())
+ );
nodeEpochs.put(nodeId, newEpoch);
}
}
@@ -1300,9 +1300,9 @@ public class RaftEventSimulationTest {
if (!filters.get(inflightRequest.sourceId).acceptInbound(inbound))
return;
- cluster.nodeIfRunning(inflightRequest.sourceId).ifPresent(node -> {
- node.channel.mockReceive(inbound);
- });
+ cluster.nodeIfRunning(inflightRequest.sourceId).ifPresent(node ->
+ node.channel.mockReceive(inbound)
+ );
}
void filter(int nodeId, NetworkFilter filter) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
index cd8bbb85678..38033c14ef0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
@@ -113,18 +113,9 @@ public class UnattachedStateTest {
// Check that the leader is persisted if the leader is known
assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters),
state.election());
- // Check that the replcia rejects all votes request if the leader is
known
- assertEquals(
- false,
- state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID),
isLogUpToDate)
- );
- assertEquals(
- false,
- state.canGrantVote(ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID),
isLogUpToDate)
- );
- assertEquals(
- false,
- state.canGrantVote(ReplicaKey.of(3, ReplicaKey.NO_DIRECTORY_ID),
isLogUpToDate)
- );
+ // Check that the replica rejects all votes request if the leader is
known
+ assertFalse(state.canGrantVote(ReplicaKey.of(1,
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
+ assertFalse(state.canGrantVote(ReplicaKey.of(2,
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
+ assertFalse(state.canGrantVote(ReplicaKey.of(3,
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
}
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
index 8a4b774bae7..af2d5822ed5 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
@@ -494,10 +494,10 @@ class BatchAccumulatorTest {
List<BatchAccumulator.CompletedBatch<String>> drained = acc.drain();
assertEquals(1, drained.size());
assertEquals(Long.MAX_VALUE - time.milliseconds(),
acc.timeUntilDrain(time.milliseconds()));
- drained.forEach(completedBatch -> {
- completedBatch.data.batches().forEach(recordBatch -> {
- assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch());
});
- });
+ drained.forEach(completedBatch ->
+ completedBatch.data.batches().forEach(recordBatch ->
+ assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()))
+ );
acc.close();
}
@@ -552,12 +552,12 @@ class BatchAccumulatorTest {
true));
} else {
assertEquals("Wanted base offset 156, but the next offset was 157",
- assertThrows(UnexpectedBaseOffsetException.class, () -> {
+ assertThrows(UnexpectedBaseOffsetException.class, () ->
acc.append(leaderEpoch,
singletonList("a"),
OptionalLong.of(baseOffset - 1),
- true);
- }).getMessage());
+ true)
+ ).getMessage());
}
acc.close();
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
index ef0a648beaf..ccb28f45477 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -81,7 +82,7 @@ class BatchBuilderTest {
assertEquals(compressionType, batch.compressionType());
assertEquals(baseOffset, batch.baseOffset());
assertEquals(logAppendTime, batch.maxTimestamp());
- assertEquals(false, batch.isControlBatch());
+ assertFalse(batch.isControlBatch());
assertEquals(leaderEpoch, batch.partitionLeaderEpoch());
List<String> builtRecords = Utils.toList(batch).stream()
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 3e990037bb8..a8d34ec5b33 100644
---
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -166,7 +166,7 @@ public final class RecordsIteratorTest {
.setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setVoterSet(Optional.empty())
.setRawSnapshotWriter(
- new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10),
snapshotBuf -> buffer.set(snapshotBuf))
+ new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10),
buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot =
builder.build(STRING_SERDE)) {
snapshot.append(Arrays.asList("a", "b", "c"));
@@ -216,7 +216,7 @@ public final class RecordsIteratorTest {
.setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.of(voterSet))
.setRawSnapshotWriter(
- new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10),
snapshotBuf -> buffer.set(snapshotBuf))
+ new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10),
buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot =
builder.build(STRING_SERDE)) {
snapshot.append(Arrays.asList("a", "b", "c"));
diff --git
a/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java
index 7e9de94c589..50c0fa0408b 100644
---
a/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java
+++
b/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java
@@ -86,7 +86,7 @@ final class NotifyingRawSnapshotWriterTest {
assertTrue(rawWriter.closed);
}
- class NoopRawSnapshotWriter implements RawSnapshotWriter {
+ static class NoopRawSnapshotWriter implements RawSnapshotWriter {
boolean frozen = false;
boolean closed = false;
diff --git
a/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
index 60752ccdbbd..8a5d384462f 100644
---
a/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
+++
b/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
@@ -57,7 +57,7 @@ final class RecordsSnapshotWriterTest {
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
.setRawSnapshotWriter(
- new MockRawSnapshotWriter(snapshotId, snapshotBuf ->
buffer.set(snapshotBuf))
+ new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot =
builder.build(STRING_SERDE)) {
snapshot.freeze();
@@ -109,7 +109,7 @@ final class RecordsSnapshotWriterTest {
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
.setRawSnapshotWriter(
- new MockRawSnapshotWriter(snapshotId, snapshotBuf ->
buffer.set(snapshotBuf))
+ new MockRawSnapshotWriter(snapshotId, buffer::set)
);
assertThrows(IllegalStateException.class, () ->
builder.build(STRING_SERDE));
@@ -129,7 +129,7 @@ final class RecordsSnapshotWriterTest {
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
.setRawSnapshotWriter(
- new MockRawSnapshotWriter(snapshotId, snapshotBuf ->
buffer.set(snapshotBuf))
+ new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot =
builder.build(STRING_SERDE)) {
snapshot.freeze();
@@ -186,7 +186,7 @@ final class RecordsSnapshotWriterTest {
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
.setRawSnapshotWriter(
- new MockRawSnapshotWriter(snapshotId, snapshotBuf ->
buffer.set(snapshotBuf))
+ new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot =
builder.build(STRING_SERDE)) {
snapshot.freeze();
diff --git
a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index e0243740c95..fd8a136a9e6 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -136,9 +136,9 @@ public final class SnapshotWriterReaderTest {
try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(id, 0).get()) {
assertEquals(id, snapshot.snapshotId());
- expected.forEach(batch -> {
- assertDoesNotThrow(() -> snapshot.append(batch));
- });
+ expected.forEach(batch ->
+ assertDoesNotThrow(() -> snapshot.append(batch))
+ );
}
assertEquals(Optional.empty(), context.log.readSnapshot(id));
@@ -163,9 +163,9 @@ public final class SnapshotWriterReaderTest {
try (SnapshotWriter<String> snapshot =
context.client.createSnapshot(id, 0).get()) {
assertEquals(id, snapshot.snapshotId());
- expected.forEach(batch -> {
- assertDoesNotThrow(() -> snapshot.append(batch));
- });
+ expected.forEach(batch ->
+ assertDoesNotThrow(() -> snapshot.append(batch))
+ );
snapshot.freeze();