This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e3dde99d7f MINOR: Various cleanups in raft (#16611)
7e3dde99d7f is described below

commit 7e3dde99d7fac8b1eb14dacf4dd8f7137d78bf80
Author: Mickael Maison <[email protected]>
AuthorDate: Thu Jul 18 12:48:29 2024 +0200

    MINOR: Various cleanups in raft (#16611)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 26 +++++++++++-----------
 .../java/org/apache/kafka/raft/LeaderState.java    |  2 +-
 .../main/java/org/apache/kafka/raft/RaftUtil.java  |  4 ++--
 .../java/org/apache/kafka/raft/RequestManager.java | 10 ++++-----
 .../org/apache/kafka/raft/UnattachedState.java     |  4 ++--
 .../org/apache/kafka/raft/internals/VoterSet.java  |  5 +----
 .../apache/kafka/raft/KafkaNetworkChannelTest.java |  6 ++---
 .../kafka/raft/KafkaRaftClientReconfigTest.java    |  4 ++--
 .../org/apache/kafka/raft/LeaderStateTest.java     |  2 +-
 .../test/java/org/apache/kafka/raft/MockLog.java   | 16 ++++++-------
 .../java/org/apache/kafka/raft/MockLogTest.java    | 16 +++++--------
 .../apache/kafka/raft/RaftClientTestContext.java   |  8 +++----
 .../apache/kafka/raft/RaftEventSimulationTest.java | 12 +++++-----
 .../org/apache/kafka/raft/UnattachedStateTest.java | 17 ++++----------
 .../kafka/raft/internals/BatchAccumulatorTest.java | 14 ++++++------
 .../kafka/raft/internals/BatchBuilderTest.java     |  3 ++-
 .../kafka/raft/internals/RecordsIteratorTest.java  |  4 ++--
 .../snapshot/NotifyingRawSnapshotWriterTest.java   |  2 +-
 .../kafka/snapshot/RecordsSnapshotWriterTest.java  |  8 +++----
 .../kafka/snapshot/SnapshotWriterReaderTest.java   | 12 +++++-----
 20 files changed, 78 insertions(+), 97 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 17bf64f4666..5086532bb7e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -1206,7 +1206,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                     .setErrorCode(error.code())
                     .setLogStartOffset(log.startOffset())
                     .setHighWatermark(
-                        highWatermark.map(offsetMetadata -> 
offsetMetadata.offset()).orElse(-1L)
+                        
highWatermark.map(LogOffsetMetadata::offset).orElse(-1L)
                     );
 
                 partitionData.currentLeader()
@@ -1991,7 +1991,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
      * @param leaderId Optional leaderId from the response
      * @param epoch Epoch received from the response
      * @param leaderEndpoints the endpoints of the leader from the response
-     * @param souce the node the sent the response
+     * @param source the node that sent the response
      * @param currentTimeMs Current epoch time in milliseconds
      * @return Optional value indicating whether the error was handled here 
and the outcome of
      *    that handling. Specifically:
@@ -2012,8 +2012,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         long currentTimeMs
     ) {
         if (leaderEndpoints.isEmpty() && leaderId.isPresent()) {
-            // The response didn't include the leader enpoints because it is 
from a replica
-            // that doesn't support reconfiguration. Look up the the leader 
endpoint in the
+            // The response didn't include the leader endpoints because it is 
from a replica
+            // that doesn't support reconfiguration. Look up the leader 
endpoint in the
             // voter set.
             leaderEndpoints = partitionState
                 .lastVoterSet()
@@ -2334,7 +2334,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         return minBackoffMs;
     }
 
-    private long maybeSendRequets(
+    private long maybeSendRequest(
         long currentTimeMs,
         Set<ReplicaKey> remoteVoters,
         Function<Integer, Node> destinationSupplier,
@@ -2459,9 +2459,9 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                     double elapsedTimePerRecord = (double) elapsedTime / 
batch.numRecords;
                     kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, 
appendTimeMs);
                     logger.debug("Completed commit of {} records up to last 
offset {}", batch.numRecords, offsetAndEpoch);
-                    batch.records.ifPresent(records -> {
-                        maybeFireHandleCommit(batch.baseOffset, epoch, 
batch.appendTimestamp(), batch.sizeInBytes(), records);
-                    });
+                    batch.records.ifPresent(records ->
+                        maybeFireHandleCommit(batch.baseOffset, epoch, 
batch.appendTimestamp(), batch.sizeInBytes(), records)
+                    );
                 }
             });
         } finally {
@@ -2501,7 +2501,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         long timeUntilNextBeginQuorumSend = 
state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);
         if (timeUntilNextBeginQuorumSend == 0) {
             VoterSet voters = partitionState.lastVoterSet();
-            timeUntilNextBeginQuorumSend = maybeSendRequets(
+            timeUntilNextBeginQuorumSend = maybeSendRequest(
                 currentTimeMs,
                 voters
                     .voterKeys()
@@ -2587,7 +2587,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         // Continue sending Vote requests as long as we still have a chance to 
win the election
         if (!state.isVoteRejected()) {
             VoterSet voters = partitionState.lastVoterSet();
-            return maybeSendRequets(
+            return maybeSendRequest(
                 currentTimeMs,
                 state.unrecordedVoters(),
                 voterId -> voters
@@ -2783,9 +2783,9 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         }
 
         // Check listener progress to see if reads are expected
-        quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
-            updateListenersProgress(highWatermarkMetadata.offset());
-        });
+        quorum.highWatermark().ifPresent(highWatermarkMetadata ->
+            updateListenersProgress(highWatermarkMetadata.offset())
+        );
 
         // Notify the new listeners of the latest leader and epoch
         Optional<LeaderState<T>> leaderState = quorum.maybeLeaderState();
diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java 
b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
index 9cdfbbf3d84..2a91b3591bb 100644
--- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java
@@ -239,7 +239,7 @@ public class LeaderState<T> implements EpochState {
                 offsetOfVotersAtEpochStart.ifPresent(offset -> {
                     if (offset == -1) {
                         // Latest voter set came from the bootstrap checkpoint 
(0-0.checkpoint)
-                        // rewrite the voter set to the log so that it is 
replcated to the replicas.
+                        // rewrite the voter set to the log so that it is 
replicated to the replicas.
                         if (!kraftVersionAtEpochStart.isReconfigSupported()) {
                             throw new IllegalStateException(
                                 String.format(
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
index c28864b2f2b..816bd7a95ae 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
@@ -301,7 +301,7 @@ public class RaftUtil {
         String clusterId,
         int leaderEpoch,
         int leaderId,
-        Endpoints leaderEndponts,
+        Endpoints leaderEndpoints,
         ReplicaKey voterKey
     ) {
         return new BeginQuorumEpochRequestData()
@@ -322,7 +322,7 @@ public class RaftUtil {
                         )
                 )
             )
-            .setLeaderEndpoints(leaderEndponts.toBeginQuorumEpochRequest());
+            .setLeaderEndpoints(leaderEndpoints.toBeginQuorumEpochRequest());
     }
 
     public static BeginQuorumEpochResponseData 
singletonBeginQuorumEpochResponse(
diff --git a/raft/src/main/java/org/apache/kafka/raft/RequestManager.java 
b/raft/src/main/java/org/apache/kafka/raft/RequestManager.java
index 72eab680777..809cf7a50ae 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RequestManager.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RequestManager.java
@@ -38,7 +38,7 @@ import java.util.Random;
  * When a request times out or completes successfully the collection will 
transition back to the
  * ready state.
  *
- * When a request completes with an error it still transition to the backoff 
state until
+ * When a request completes with an error it still transitions to the backoff 
state until
  * {@code retryBackoffMs}.
  */
 public class RequestManager {
@@ -62,7 +62,7 @@ public class RequestManager {
     }
 
     /**
-     * Returns true if there any connection with pending requests.
+     * Returns true if there are any connections with pending requests.
      *
      * This is useful for satisfying the invariant that there is only one 
pending Fetch request.
      * If there are more than one pending fetch request, it is possible for 
the follower to write
@@ -105,7 +105,7 @@ public class RequestManager {
      * @return a random ready bootstrap node
      */
     public Optional<Node> findReadyBootstrapServer(long currentTimeMs) {
-        // Check that there are no infilght requests accross any of the known 
nodes not just
+        // Check that there are no inflight requests across any of the known 
nodes not just
         // the bootstrap servers
         if (hasAnyInflightRequest(currentTimeMs)) {
             return Optional.empty();
@@ -133,7 +133,7 @@ public class RequestManager {
      * If there is a connection with a pending request it returns the amount 
of time to wait until
      * the request times out.
      *
-     * Returns zero, if there are no pending requests and at least one of the 
boorstrap servers is
+     * Returns zero, if there are no pending requests and at least one of the 
bootstrap servers is
      * ready.
      *
      * If all of the bootstrap servers are backing off and there are no 
pending requests, return
@@ -243,7 +243,7 @@ public class RequestManager {
     public void onResponseResult(Node node, long correlationId, boolean 
success, long timeMs) {
         if (isResponseExpected(node, correlationId)) {
             if (success) {
-                // Mark the connection as ready by reseting it
+                // Mark the connection as ready by resetting it
                 reset(node);
             } else {
                 // Backoff the connection
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java 
b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index 3c4680a8526..1dba6a70f28 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -28,14 +28,14 @@ import java.util.OptionalInt;
 import java.util.Set;
 
 /**
- * A replica is "unattached" when it doesn't known the leader or the leader's 
endpoint.
+ * A replica is "unattached" when it doesn't know the leader or the leader's 
endpoint.
  *
  * Typically, a replica doesn't know the leader if the KRaft topic is 
undergoing an election cycle.
  *
  * It is also possible for a replica to be unattached if it doesn't know the 
leader's endpoint.
  * This typically happens when a replica starts up and the known leader id is 
not part of the local
  * voter set. In that case, during startup the replica transitions to 
unattached instead of
- * transitioning to follower. The unattched replica discovers the leader and 
leader's endpoint
+ * transitioning to follower. The unattached replica discovers the leader and 
leader's endpoint
  * either through random Fetch requests to the bootstrap servers or through 
BeginQuorumEpoch
  * request from the leader.
  */
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index 2f961897982..cca955d41e3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -145,10 +145,7 @@ public final class VoterSet {
      * Returns all of the voters.
      */
     public Set<VoterNode> voterNodes() {
-        return voters
-            .values()
-            .stream()
-            .collect(Collectors.toSet());
+        return new HashSet<>(voters.values());
     }
 
     /**
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
index 00000ec7d81..579508a2569 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaNetworkChannelTest.java
@@ -285,12 +285,12 @@ public class KafkaNetworkChannelTest {
                 return VoteRequest.singletonRequest(topicPartition, clusterId, 
leaderEpoch, leaderId, lastEpoch, 329);
 
             case FETCH:
-                FetchRequestData request = 
RaftUtil.singletonFetchRequest(topicPartition, topicId, fetchPartition -> {
+                FetchRequestData request = 
RaftUtil.singletonFetchRequest(topicPartition, topicId, fetchPartition ->
                     fetchPartition
                         .setCurrentLeaderEpoch(5)
                         .setFetchOffset(333)
-                        .setLastFetchedEpoch(5);
-                });
+                        .setLastFetchedEpoch(5)
+                );
                 request.setReplicaState(new 
FetchRequestData.ReplicaState().setReplicaId(1));
                 return request;
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index 84a9af918b6..678780db441 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -302,13 +302,13 @@ public class KafkaRaftClientReconfigTest {
                 new LeaderChangeMessage()
             );
             builder.appendKRaftVersionMessage(
-                0, // timesteamp
+                0, // timestamp
                 new KRaftVersionRecord()
                     
.setVersion(ControlRecordUtils.KRAFT_VERSION_CURRENT_VERSION)
                     .setKRaftVersion((short) 1)
             );
             builder.appendVotersMessage(
-                0, // timesteamp
+                0, // timestamp
                 
leadersVoterSet.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION)
             );
             MemoryRecords leaderRecords = builder.build();
diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
index a423d416c8b..dfb5469434d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
@@ -539,7 +539,7 @@ public class LeaderStateTest {
         assertFalse(state.updateReplicaState(nodeKey2, 0, new 
LogOffsetMetadata(15L)));
         assertEquals(Optional.of(new LogOffsetMetadata(15L)), 
state.highWatermark());
 
-        // HW will not update to 16L until majority of remaining voterSet 
(node1, node2) are at least 16L
+        // HW will not update to 16L until the majority of remaining voterSet 
(node1, node2) are at least 16L
         assertTrue(state.updateReplicaState(nodeKey2, 0, new 
LogOffsetMetadata(16L)));
         assertEquals(Optional.of(new LogOffsetMetadata(16L)), 
state.highWatermark());
     }
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index b8c2089caf2..29281fa633f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -499,9 +499,9 @@ public class MockLog implements ReplicatedLog {
             return Optional.empty();
         } else {
             return Optional.of(
-                new MockRawSnapshotWriter(snapshotId, buffer -> {
-                    snapshots.putIfAbsent(snapshotId, new 
MockRawSnapshotReader(snapshotId, buffer));
-                })
+                new MockRawSnapshotWriter(snapshotId, buffer ->
+                    snapshots.putIfAbsent(snapshotId, new 
MockRawSnapshotReader(snapshotId, buffer))
+                )
             );
         }
     }
@@ -569,12 +569,10 @@ public class MockLog implements ReplicatedLog {
                 return false;
             });
 
-            last.get().ifPresent(epochStartOffset -> {
-                epochStartOffsets.add(
-                    0,
-                    new EpochStartOffset(epochStartOffset.epoch, 
snapshotId.offset())
-                );
-            });
+            last.get().ifPresent(epochStartOffset ->
+                epochStartOffsets.add(0, new 
EpochStartOffset(epochStartOffset.epoch, snapshotId.offset())
+                )
+            );
 
             updated = true;
         }
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 0c389a41522..08e19866d9b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -168,21 +168,15 @@ public class MockLogTest {
         // Throw exception for out of order records
         assertThrows(
             RuntimeException.class,
-            () -> {
-                log.appendAsLeader(
+            () -> log.appendAsLeader(
                     MemoryRecords.withRecords(initialOffset, Compression.NONE, 
currentEpoch, recordFoo),
-                    currentEpoch
-                );
-            }
+                    currentEpoch)
         );
 
         assertThrows(
             RuntimeException.class,
-            () -> {
-                log.appendAsFollower(
-                    MemoryRecords.withRecords(initialOffset, Compression.NONE, 
currentEpoch, recordFoo)
-                );
-            }
+            () -> log.appendAsFollower(
+                    MemoryRecords.withRecords(initialOffset, Compression.NONE, 
currentEpoch, recordFoo))
         );
     }
 
@@ -513,7 +507,7 @@ public class MockLogTest {
     }
 
     @Test
-    public void testCreateSnapshotMuchEalierEpoch() {
+    public void testCreateSnapshotMuchEarlierEpoch() {
         int numberOfRecords = 10;
         int epoch = 2;
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch);
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 60147c07888..4b80eea2534 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1097,9 +1097,9 @@ public final class RaftClientTestContext {
 
                 assertEquals(epoch, partitionRequest.leaderEpoch());
                 assertEquals(localIdOrThrow(), partitionRequest.leaderId());
-                preferredSuccessorsOpt.ifPresent(preferredSuccessors -> {
-                    assertEquals(preferredSuccessors, 
partitionRequest.preferredSuccessors());
-                });
+                preferredSuccessorsOpt.ifPresent(preferredSuccessors ->
+                    assertEquals(preferredSuccessors, 
partitionRequest.preferredSuccessors())
+                );
 
                 collectedDestinationIdSet.add(raftMessage.destination().id());
                 endQuorumRequests.add(raftMessage);
@@ -1709,7 +1709,7 @@ public final class RaftClientTestContext {
             return commits.stream()
                 .filter(batch -> batch.lastOffset() == lastOffset)
                 .findFirst()
-                .map(batch -> batch.records())
+                .map(Batch::records)
                 .orElse(null);
         }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index c57c441b98b..5954379cf96 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -996,9 +996,9 @@ public class RaftEventSimulationTest {
                     fail("Non-monotonic update of epoch detected on node " + 
nodeId + ": " +
                             oldEpoch + " -> " + newEpoch);
                 }
-                cluster.ifRunning(nodeId, nodeState -> {
-                    assertEquals(newEpoch, nodeState.client.quorum().epoch());
-                });
+                cluster.ifRunning(nodeId, nodeState ->
+                    assertEquals(newEpoch, nodeState.client.quorum().epoch())
+                );
                 nodeEpochs.put(nodeId, newEpoch);
             }
         }
@@ -1300,9 +1300,9 @@ public class RaftEventSimulationTest {
             if (!filters.get(inflightRequest.sourceId).acceptInbound(inbound))
                 return;
 
-            cluster.nodeIfRunning(inflightRequest.sourceId).ifPresent(node -> {
-                node.channel.mockReceive(inbound);
-            });
+            cluster.nodeIfRunning(inflightRequest.sourceId).ifPresent(node ->
+                node.channel.mockReceive(inbound)
+            );
         }
 
         void filter(int nodeId, NetworkFilter filter) {
diff --git a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
index cd8bbb85678..38033c14ef0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
@@ -113,18 +113,9 @@ public class UnattachedStateTest {
         // Check that the leader is persisted if the leader is known
         assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), 
state.election());
 
-        // Check that the replcia rejects all votes request if the leader is 
known
-        assertEquals(
-            false,
-            state.canGrantVote(ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
-        );
-        assertEquals(
-            false,
-            state.canGrantVote(ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
-        );
-        assertEquals(
-            false,
-            state.canGrantVote(ReplicaKey.of(3, ReplicaKey.NO_DIRECTORY_ID), 
isLogUpToDate)
-        );
+        // Check that the replica rejects all votes request if the leader is 
known
+        assertFalse(state.canGrantVote(ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
+        assertFalse(state.canGrantVote(ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
+        assertFalse(state.canGrantVote(ReplicaKey.of(3, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate));
     }
 }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
index 8a4b774bae7..af2d5822ed5 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
@@ -494,10 +494,10 @@ class BatchAccumulatorTest {
         List<BatchAccumulator.CompletedBatch<String>> drained = acc.drain();
         assertEquals(1, drained.size());
         assertEquals(Long.MAX_VALUE - time.milliseconds(), 
acc.timeUntilDrain(time.milliseconds()));
-        drained.forEach(completedBatch -> {
-            completedBatch.data.batches().forEach(recordBatch -> {
-                assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()); 
});
-        });
+        drained.forEach(completedBatch ->
+            completedBatch.data.batches().forEach(recordBatch ->
+                assertEquals(leaderEpoch, recordBatch.partitionLeaderEpoch()))
+        );
 
         acc.close();
     }
@@ -552,12 +552,12 @@ class BatchAccumulatorTest {
                     true));
         } else {
             assertEquals("Wanted base offset 156, but the next offset was 157",
-                assertThrows(UnexpectedBaseOffsetException.class, () -> {
+                assertThrows(UnexpectedBaseOffsetException.class, () ->
                     acc.append(leaderEpoch,
                         singletonList("a"),
                         OptionalLong.of(baseOffset - 1),
-                        true);
-                }).getMessage());
+                        true)
+                ).getMessage());
         }
         acc.close();
     }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
index ef0a648beaf..ccb28f45477 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchBuilderTest.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -81,7 +82,7 @@ class BatchBuilderTest {
         assertEquals(compressionType, batch.compressionType());
         assertEquals(baseOffset, batch.baseOffset());
         assertEquals(logAppendTime, batch.maxTimestamp());
-        assertEquals(false, batch.isControlBatch());
+        assertFalse(batch.isControlBatch());
         assertEquals(leaderEpoch, batch.partitionLeaderEpoch());
 
         List<String> builtRecords = Utils.toList(batch).stream()
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 3e990037bb8..a8d34ec5b33 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -166,7 +166,7 @@ public final class RecordsIteratorTest {
             .setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
             .setVoterSet(Optional.empty())
             .setRawSnapshotWriter(
-                new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), 
snapshotBuf -> buffer.set(snapshotBuf))
+                new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), 
buffer::set)
             );
         try (RecordsSnapshotWriter<String> snapshot = 
builder.build(STRING_SERDE)) {
             snapshot.append(Arrays.asList("a", "b", "c"));
@@ -216,7 +216,7 @@ public final class RecordsIteratorTest {
             .setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
             .setVoterSet(Optional.of(voterSet))
             .setRawSnapshotWriter(
-                new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), 
snapshotBuf -> buffer.set(snapshotBuf))
+                new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), 
buffer::set)
             );
         try (RecordsSnapshotWriter<String> snapshot = 
builder.build(STRING_SERDE)) {
             snapshot.append(Arrays.asList("a", "b", "c"));
diff --git 
a/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java
 
b/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java
index 7e9de94c589..50c0fa0408b 100644
--- 
a/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/snapshot/NotifyingRawSnapshotWriterTest.java
@@ -86,7 +86,7 @@ final class NotifyingRawSnapshotWriterTest {
         assertTrue(rawWriter.closed);
     }
 
-    class NoopRawSnapshotWriter implements RawSnapshotWriter {
+    static class NoopRawSnapshotWriter implements RawSnapshotWriter {
         boolean frozen = false;
         boolean closed = false;
 
diff --git 
a/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java 
b/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
index 60752ccdbbd..8a5d384462f 100644
--- 
a/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java
@@ -57,7 +57,7 @@ final class RecordsSnapshotWriterTest {
             .setTime(new MockTime())
             .setMaxBatchSize(maxBatchSize)
             .setRawSnapshotWriter(
-                new MockRawSnapshotWriter(snapshotId, snapshotBuf -> 
buffer.set(snapshotBuf))
+                new MockRawSnapshotWriter(snapshotId, buffer::set)
             );
         try (RecordsSnapshotWriter<String> snapshot = 
builder.build(STRING_SERDE)) {
             snapshot.freeze();
@@ -109,7 +109,7 @@ final class RecordsSnapshotWriterTest {
             .setTime(new MockTime())
             .setMaxBatchSize(maxBatchSize)
             .setRawSnapshotWriter(
-                new MockRawSnapshotWriter(snapshotId, snapshotBuf -> 
buffer.set(snapshotBuf))
+                new MockRawSnapshotWriter(snapshotId, buffer::set)
             );
 
         assertThrows(IllegalStateException.class, () -> 
builder.build(STRING_SERDE));
@@ -129,7 +129,7 @@ final class RecordsSnapshotWriterTest {
             .setTime(new MockTime())
             .setMaxBatchSize(maxBatchSize)
             .setRawSnapshotWriter(
-                new MockRawSnapshotWriter(snapshotId, snapshotBuf -> 
buffer.set(snapshotBuf))
+                new MockRawSnapshotWriter(snapshotId, buffer::set)
             );
         try (RecordsSnapshotWriter<String> snapshot = 
builder.build(STRING_SERDE)) {
             snapshot.freeze();
@@ -186,7 +186,7 @@ final class RecordsSnapshotWriterTest {
             .setTime(new MockTime())
             .setMaxBatchSize(maxBatchSize)
             .setRawSnapshotWriter(
-                new MockRawSnapshotWriter(snapshotId, snapshotBuf -> 
buffer.set(snapshotBuf))
+                new MockRawSnapshotWriter(snapshotId, buffer::set)
             );
         try (RecordsSnapshotWriter<String> snapshot = 
builder.build(STRING_SERDE)) {
             snapshot.freeze();
diff --git 
a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java 
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index e0243740c95..fd8a136a9e6 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -136,9 +136,9 @@ public final class SnapshotWriterReaderTest {
 
         try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(id, 0).get()) {
             assertEquals(id, snapshot.snapshotId());
-            expected.forEach(batch -> {
-                assertDoesNotThrow(() -> snapshot.append(batch));
-            });
+            expected.forEach(batch ->
+                assertDoesNotThrow(() -> snapshot.append(batch))
+            );
         }
 
         assertEquals(Optional.empty(), context.log.readSnapshot(id));
@@ -163,9 +163,9 @@ public final class SnapshotWriterReaderTest {
 
         try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(id, 0).get()) {
             assertEquals(id, snapshot.snapshotId());
-            expected.forEach(batch -> {
-                assertDoesNotThrow(() -> snapshot.append(batch));
-            });
+            expected.forEach(batch ->
+                assertDoesNotThrow(() -> snapshot.append(batch))
+            );
 
             snapshot.freeze();
 

Reply via email to