This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bc68011b62d MINOR: Various cleanups in CoordinatorRuntimeTest (#17829)
bc68011b62d is described below

commit bc68011b62df49c2b5f9393f573efcc09dd304de
Author: David Jacot <[email protected]>
AuthorDate: Mon Nov 18 10:30:54 2024 +0100

    MINOR: Various cleanups in CoordinatorRuntimeTest (#17829)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../common/runtime/CoordinatorRuntimeTest.java     | 300 ++++++++++-----------
 1 file changed, 145 insertions(+), 155 deletions(-)

diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index 7172fb2e899..ace0d0f7aac 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -143,7 +143,7 @@ public class CoordinatorRuntimeTest {
         }
 
         @Override
-        public void close() throws Exception {}
+        public void close() {}
     }
 
     /**
@@ -181,7 +181,7 @@ public class CoordinatorRuntimeTest {
         }
 
         @Override
-        public void close() throws Exception {
+        public void close() {
 
         }
     }
@@ -219,7 +219,7 @@ public class CoordinatorRuntimeTest {
         }
 
         @Override
-        public void close() throws Exception { }
+        public void close() { }
     }
 
     /**
@@ -403,28 +403,18 @@ public class CoordinatorRuntimeTest {
         Set<String> pendingRecords(long producerId) {
             TimelineHashSet<RecordAndMetadata> pending = 
pendingRecords.get(producerId);
             if (pending == null) return Collections.emptySet();
-            return Collections.unmodifiableSet(
-                pending.stream().map(record -> 
record.record).collect(Collectors.toSet())
-            );
+            return pending.stream().map(record -> 
record.record).collect(Collectors.toUnmodifiableSet());
         }
 
         Set<String> records() {
-            return Collections.unmodifiableSet(
-                records.stream().map(record -> 
record.record).collect(Collectors.toSet())
-            );
+            return records.stream().map(record -> 
record.record).collect(Collectors.toUnmodifiableSet());
         }
 
         List<RecordAndMetadata> fullRecords() {
-            return Collections.unmodifiableList(
-                records
-                    .stream()
-                    .sorted(Comparator.comparingLong(record -> record.offset))
-                    .collect(Collectors.toList())
-            );
-        }
-
-        CoordinatorTimer<Void, String> timer() {
-            return timer;
+            return records
+                .stream()
+                .sorted(Comparator.comparingLong(record -> record.offset))
+                .collect(Collectors.toList());
         }
     }
 
@@ -1081,11 +1071,11 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Write #1.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1")
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1")
         );
 
         // Verify that the write is not committed yet.
@@ -1096,17 +1086,17 @@ public class CoordinatorRuntimeTest {
         // The last committed offset does not change.
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         // A new snapshot is created.
-        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         // Records have been replayed to the coordinator.
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().records());
         // Records have been written to the log.
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), "record1", "record2")
         ), writer.entries(TP));
 
         // Write #2.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record3"), "response2"));
+            state -> new CoordinatorResult<>(List.of("record3"), "response2"));
 
         // Verify that the write is not committed yet.
         assertFalse(write2.isDone());
@@ -1116,11 +1106,11 @@ public class CoordinatorRuntimeTest {
         // The last committed offset does not change.
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         // A new snapshot is created.
-        assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
         // Records have been replayed to the coordinator.
         assertEquals(Set.of("record1", "record2", "record3"), 
ctx.coordinator.coordinator().records());
         // Records have been written to the log.
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), "record1", "record2"),
             records(timer.time().milliseconds(), "record3")
         ), writer.entries(TP));
@@ -1135,9 +1125,9 @@ public class CoordinatorRuntimeTest {
         // The state does not change.
         assertEquals(3L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2", "record3"), 
ctx.coordinator.coordinator().records());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), "record1", "record2"),
             records(timer.time().milliseconds(), "record3")
         ), writer.entries(TP));
@@ -1152,7 +1142,7 @@ public class CoordinatorRuntimeTest {
         // The last committed offset is updated.
         assertEquals(2L, ctx.coordinator.lastCommittedOffset());
         // The snapshot is cleaned up.
-        assertEquals(Arrays.asList(2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Commit write #2.
         writer.commit(TP, 3);
@@ -1166,7 +1156,7 @@ public class CoordinatorRuntimeTest {
         // The last committed offset is updated.
         assertEquals(3L, ctx.coordinator.lastCommittedOffset());
         // The snapshot is cleaned up.
-        assertEquals(Collections.singletonList(3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(3L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Write #4 but without records.
         CompletableFuture<String> write4 = 
runtime.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT,
@@ -1175,7 +1165,7 @@ public class CoordinatorRuntimeTest {
         // It is completed immediately because the state is fully committed.
         assertTrue(write4.isDone());
         assertEquals("response4", write4.get(5, TimeUnit.SECONDS));
-        assertEquals(Collections.singletonList(3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(3L), 
ctx.coordinator.snapshotRegistry().epochsList());
     }
 
     @Test
@@ -1254,7 +1244,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Override the coordinator with a coordinator that throws
         // an exception when replay is called.
@@ -1278,13 +1268,13 @@ public class CoordinatorRuntimeTest {
 
         // Write. It should fail.
         CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"));
         assertFutureThrows(write, IllegalArgumentException.class);
 
         // Verify that the state has not changed.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
     }
 
     @Test
@@ -1314,28 +1304,28 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0, ctx.coordinator.lastWrittenOffset());
         assertEquals(0, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Write #1. It should succeed and be applied to the coordinator.
         runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"));
 
         // Verify that the state has been updated.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().records());
 
         // Write #2. It should fail because the writer is configured to only
         // accept 1 write.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record3", 
"record4", "record5"), "response2"));
+            state -> new CoordinatorResult<>(List.of("record3", "record4", 
"record5"), "response2"));
         assertFutureThrows(write2, KafkaException.class);
 
         // Verify that the state has not changed.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().records());
     }
 
@@ -1366,11 +1356,11 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0, ctx.coordinator.lastWrittenOffset());
         assertEquals(0, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Write #1. We should get a TimeoutException because the HWM will not 
advance.
         CompletableFuture<String> timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(3),
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"));
 
         timer.advanceClock(4);
 
@@ -1410,8 +1400,8 @@ public class CoordinatorRuntimeTest {
         List<CompletableFuture<List<String>>> writes = 
runtime.scheduleWriteAllOperation("write", DEFAULT_WRITE_TIMEOUT, state -> {
             int counter = cnt.getAndIncrement();
             return new CoordinatorResult<>(
-                Collections.singletonList("record#" + counter),
-                Collections.singletonList("response#" + counter)
+                List.of("record#" + counter),
+                List.of("response#" + counter)
             );
         });
 
@@ -1419,9 +1409,9 @@ public class CoordinatorRuntimeTest {
         assertEquals(1L, 
runtime.contextOrThrow(coordinator1).coordinator.lastWrittenOffset());
         assertEquals(1L, 
runtime.contextOrThrow(coordinator2).coordinator.lastWrittenOffset());
 
-        
assertEquals(Collections.singletonList(records(timer.time().milliseconds(), 
"record#0")), writer.entries(coordinator0));
-        
assertEquals(Collections.singletonList(records(timer.time().milliseconds(), 
"record#1")), writer.entries(coordinator1));
-        
assertEquals(Collections.singletonList(records(timer.time().milliseconds(), 
"record#2")), writer.entries(coordinator2));
+        assertEquals(List.of(records(timer.time().milliseconds(), 
"record#0")), writer.entries(coordinator0));
+        assertEquals(List.of(records(timer.time().milliseconds(), 
"record#1")), writer.entries(coordinator1));
+        assertEquals(List.of(records(timer.time().milliseconds(), 
"record#2")), writer.entries(coordinator2));
 
         // Commit.
         writer.commit(coordinator0);
@@ -1430,7 +1420,7 @@ public class CoordinatorRuntimeTest {
 
         // Verify.
         assertEquals(
-            Arrays.asList("response#0", "response#1", "response#2"),
+            List.of("response#0", "response#1", "response#2"),
             FutureUtils.combineFutures(writes, ArrayList::new, 
List::addAll).get(5, TimeUnit.SECONDS)
         );
     }
@@ -1494,7 +1484,7 @@ public class CoordinatorRuntimeTest {
             100L,
             (short) 50,
             Duration.ofMillis(5000),
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response"),
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response"),
             TXN_OFFSET_COMMIT_LATEST_VERSION
         );
 
@@ -1583,7 +1573,7 @@ public class CoordinatorRuntimeTest {
             100L,
             (short) 50,
             Duration.ofMillis(5000),
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response"),
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response"),
             TXN_OFFSET_COMMIT_LATEST_VERSION
         );
 
@@ -1625,7 +1615,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Transactional write #1.
         CompletableFuture<String> write1 = 
runtime.scheduleTransactionalWriteOperation(
@@ -1635,7 +1625,7 @@ public class CoordinatorRuntimeTest {
             100L,
             (short) 5,
             DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"),
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"),
             TXN_OFFSET_COMMIT_LATEST_VERSION
         );
 
@@ -1647,14 +1637,14 @@ public class CoordinatorRuntimeTest {
         // The last committed offset does not change.
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         // A new snapshot is created.
-        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         // Records have been replayed to the coordinator. They are stored in
         // the pending set for now.
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(
             100L
         ));
         // Records have been written to the log.
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2")
         ), writer.entries(TP));
 
@@ -1677,7 +1667,7 @@ public class CoordinatorRuntimeTest {
         // The last committed offset does not change.
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         // A new snapshot is created.
-        assertEquals(Arrays.asList(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
         // Records have been replayed to the coordinator.
         ControlRecordType expectedType;
         if (result == TransactionResult.COMMIT) {
@@ -1691,7 +1681,7 @@ public class CoordinatorRuntimeTest {
         }
 
         // Records have been written to the log.
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2"),
             endTransactionMarker(100L, (short) 5, timer.time().milliseconds(), 
10, expectedType)
         ), writer.entries(TP));
@@ -1737,7 +1727,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0, ctx.coordinator.lastWrittenOffset());
         assertEquals(0, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Complete #1. We should get a TimeoutException because the HWM will 
not advance.
         CompletableFuture<Void> timedOutCompletion = 
runtime.scheduleTransactionCompletion(
@@ -1753,7 +1743,7 @@ public class CoordinatorRuntimeTest {
         // Verify that the state has been updated.
         assertEquals(1L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 1L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 1L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Advance clock to timeout Complete #1.
         timer.advanceClock(4);
@@ -1764,7 +1754,7 @@ public class CoordinatorRuntimeTest {
         // operation timeouts because the record has been written to the log.
         assertEquals(1L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 1L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 1L), 
ctx.coordinator.snapshotRegistry().epochsList());
     }
 
     @Test
@@ -1794,7 +1784,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0, ctx.coordinator.lastWrittenOffset());
         assertEquals(0, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Write #1. It should succeed and be applied to the coordinator.
         runtime.scheduleTransactionalWriteOperation(
@@ -1804,14 +1794,14 @@ public class CoordinatorRuntimeTest {
             100L,
             (short) 5,
             DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"),
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"),
             TXN_OFFSET_COMMIT_LATEST_VERSION
         );
 
         // Verify that the state has been updated.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
 
@@ -1830,7 +1820,7 @@ public class CoordinatorRuntimeTest {
         // Verify that the state has not changed.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
     }
@@ -1860,7 +1850,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Override the coordinator with a coordinator that throws
         // an exception when replayEndTransactionMarker is called.
@@ -1889,17 +1879,17 @@ public class CoordinatorRuntimeTest {
             100L,
             (short) 5,
             DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"),
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"),
             TXN_OFFSET_COMMIT_LATEST_VERSION
         );
 
         // Verify that the state has been updated.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2")
         ), writer.entries(TP));
 
@@ -1918,10 +1908,10 @@ public class CoordinatorRuntimeTest {
         // Verify that the state has not changed.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2")
         ), writer.entries(TP));
     }
@@ -1955,11 +1945,11 @@ public class CoordinatorRuntimeTest {
 
         // Write #1.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"));
 
         // Write #2.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record3", 
"record4"), "response2"));
+            state -> new CoordinatorResult<>(List.of("record3", "record4"), 
"response2"));
 
         // Commit write #1.
         writer.commit(TP, 2);
@@ -2038,11 +2028,11 @@ public class CoordinatorRuntimeTest {
 
         // Write #1.
         runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"));
 
         // Write #2.
         runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record3", 
"record4"), "response2"));
+            state -> new CoordinatorResult<>(List.of("record3", "record4"), 
"response2"));
 
         // Commit write #1.
         writer.commit(TP, 2);
@@ -2085,11 +2075,11 @@ public class CoordinatorRuntimeTest {
 
         // Writes
         runtime.scheduleWriteOperation("write#0", coordinator0, 
DEFAULT_WRITE_TIMEOUT,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record0"), "response0"));
+            state -> new CoordinatorResult<>(List.of("record0"), "response0"));
         runtime.scheduleWriteOperation("write#1", coordinator1, 
DEFAULT_WRITE_TIMEOUT,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record1"), "response1"));
+            state -> new CoordinatorResult<>(List.of("record1"), "response1"));
         runtime.scheduleWriteOperation("write#2", coordinator2, 
DEFAULT_WRITE_TIMEOUT,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record2"), "response2"));
+            state -> new CoordinatorResult<>(List.of("record2"), "response2"));
 
         // Commit writes.
         writer.commit(coordinator0);
@@ -2103,7 +2093,7 @@ public class CoordinatorRuntimeTest {
         );
 
         assertEquals(
-            Arrays.asList("record0", "record1", "record2"),
+            List.of("record0", "record1", "record2"),
             FutureUtils.combineFutures(responses, ArrayList::new, 
List::addAll).get(5, TimeUnit.SECONDS)
         );
     }
@@ -2136,11 +2126,11 @@ public class CoordinatorRuntimeTest {
 
         // Write #1.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+            state -> new CoordinatorResult<>(List.of("record1", "record2"), 
"response1"));
 
         // Write #2.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Arrays.asList("record3", 
"record4"), "response2"));
+            state -> new CoordinatorResult<>(List.of("record3", "record4"), 
"response2"));
 
         // Writes are inflight.
         assertFalse(write1.isDone());
@@ -2151,7 +2141,7 @@ public class CoordinatorRuntimeTest {
 
         // Timer #1. This is never executed.
         ctx.timer.schedule("timer-1", 10, TimeUnit.SECONDS, true,
-            () -> new CoordinatorResult<>(Arrays.asList("record5", "record6"), 
null));
+            () -> new CoordinatorResult<>(List.of("record5", "record6"), 
null));
 
         // The coordinator timer should have one pending task.
         assertEquals(1, ctx.timer.size());
@@ -2271,11 +2261,11 @@ public class CoordinatorRuntimeTest {
 
         // Timer #1.
         ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
-            () -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), 
null));
+            () -> new CoordinatorResult<>(List.of("record1", "record2"), 
null));
 
         // Timer #2.
         ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, true,
-            () -> new CoordinatorResult<>(Arrays.asList("record3", "record4"), 
null));
+            () -> new CoordinatorResult<>(List.of("record3", "record4"), 
null));
 
         // The coordinator timer should have two pending tasks.
         assertEquals(2, ctx.timer.size());
@@ -2329,7 +2319,7 @@ public class CoordinatorRuntimeTest {
 
         // Timer #1.
         ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
-            () -> new 
CoordinatorResult<>(Collections.singletonList("record1"), null));
+            () -> new CoordinatorResult<>(List.of("record1"), null));
 
         // The coordinator timer should have one pending task.
         assertEquals(1, ctx.timer.size());
@@ -2342,14 +2332,14 @@ public class CoordinatorRuntimeTest {
 
         // Schedule a second timer with the same key.
         ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
-            () -> new 
CoordinatorResult<>(Collections.singletonList("record2"), null));
+            () -> new CoordinatorResult<>(List.of("record2"), null));
 
         // The coordinator timer should still have one pending task.
         assertEquals(1, ctx.timer.size());
 
         // Schedule a third timer with the same key.
         ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
-            () -> new 
CoordinatorResult<>(Collections.singletonList("record3"), null));
+            () -> new CoordinatorResult<>(List.of("record3"), null));
 
         // The coordinator timer should still have one pending task.
         assertEquals(1, ctx.timer.size());
@@ -2404,7 +2394,7 @@ public class CoordinatorRuntimeTest {
 
         // Timer #1.
         ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
-            () -> new 
CoordinatorResult<>(Collections.singletonList("record1"), null));
+            () -> new CoordinatorResult<>(List.of("record1"), null));
 
         // The coordinator timer should have one pending task.
         assertEquals(1, ctx.timer.size());
@@ -2417,7 +2407,7 @@ public class CoordinatorRuntimeTest {
 
         // Schedule a second timer with the same key.
         ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, true,
-            () -> new 
CoordinatorResult<>(Collections.singletonList("record2"), null));
+            () -> new CoordinatorResult<>(List.of("record2"), null));
 
         // The coordinator timer should still have one pending task.
         assertEquals(1, ctx.timer.size());
@@ -2819,8 +2809,8 @@ public class CoordinatorRuntimeTest {
                         1500,
                         30,
                         3000),
-                    Arrays.asList(5L, 15L, 27L),
-                    Arrays.asList(5L, 15L)))
+                    List.of(5L, 15L, 27L),
+                    List.of(5L, 15L)))
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(supplier)
@@ -2937,18 +2927,18 @@ public class CoordinatorRuntimeTest {
 
         // Write #1.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record1"), "response1")
+            state -> new CoordinatorResult<>(List.of("record1"), "response1")
         );
         processor.poll();
 
         // Write #2.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record2"), "response2")
+            state -> new CoordinatorResult<>(List.of("record2"), "response2")
         );
         processor.poll();
 
         // Records have been written to the log.
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), "record1"),
             records(timer.time().milliseconds(), "record2")
         ), writer.entries(TP));
@@ -3007,18 +2997,18 @@ public class CoordinatorRuntimeTest {
 
         // Write#1.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("Write#1", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record1"), "response1")
+            state -> new CoordinatorResult<>(List.of("record1"), "response1")
         );
         processor.poll();
 
         // Write#2.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("Write#2", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record2"), "response2")
+            state -> new CoordinatorResult<>(List.of("record2"), "response2")
         );
         processor.poll();
 
         // Records have been written to the log.
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), "record1"),
             records(timer.time().milliseconds(), "record2")
         ), writer.entries(TP));
@@ -3092,7 +3082,7 @@ public class CoordinatorRuntimeTest {
         processor.poll();
 
         // Records have been written to the log.
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             endTransactionMarker(100, (short) 50, timer.time().milliseconds(), 
1, ControlRecordType.COMMIT)
         ), writer.entries(TP));
 
@@ -3149,7 +3139,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         int maxBatchSize = writer.config(TP).maxMessageSize();
         assertTrue(maxBatchSize > MIN_BUFFER_SIZE);
@@ -3200,7 +3190,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Get the max batch size.
@@ -3229,8 +3219,8 @@ public class CoordinatorRuntimeTest {
         // Verify the state. Records are replayed but no batch written.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
         ), ctx.coordinator.coordinator().fullRecords());
@@ -3247,8 +3237,8 @@ public class CoordinatorRuntimeTest {
         // Verify the state. Records are replayed but no batch written.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
@@ -3268,14 +3258,14 @@ public class CoordinatorRuntimeTest {
         // got flushed with all the records but the new one from #3.
         assertEquals(3L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
             new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), records.subList(0, 3))
         ), writer.entries(TP));
 
@@ -3285,14 +3275,14 @@ public class CoordinatorRuntimeTest {
         // Verify the state. The pending batch is flushed.
         assertEquals(4L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
             new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds() - 11, records.subList(0, 3)),
             records(timer.time().milliseconds() - 11, records.subList(3, 4))
         ), writer.entries(TP));
@@ -3334,7 +3324,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Get the max batch size.
@@ -3385,7 +3375,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Get the max batch size.
@@ -3415,8 +3405,8 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
@@ -3438,7 +3428,7 @@ public class CoordinatorRuntimeTest {
         // Verify the state. The state should be reverted to the initial state.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
         assertEquals(Collections.emptyList(), writer.entries(TP));
     }
@@ -3470,7 +3460,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Override the coordinator with a coordinator that throws
@@ -3518,8 +3508,8 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
         ), ctx.coordinator.coordinator().fullRecords());
         assertEquals(Collections.emptyList(), writer.entries(TP));
@@ -3535,7 +3525,7 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
         assertEquals(Collections.emptyList(), writer.entries(TP));
     }
@@ -3567,12 +3557,12 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Write #1 with one record.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record#1"), "response1")
+            state -> new CoordinatorResult<>(List.of("record#1"), "response1")
         );
 
         // Verify that the write is not committed yet.
@@ -3581,7 +3571,7 @@ public class CoordinatorRuntimeTest {
         // Verify the state. Records are replayed but no batch written.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Set.of("record#1"), 
ctx.coordinator.coordinator().records());
         assertEquals(Collections.emptyList(), writer.entries(TP));
@@ -3594,7 +3584,7 @@ public class CoordinatorRuntimeTest {
             100L,
             (short) 50,
             Duration.ofMillis(20),
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record#2"), "response2"),
+            state -> new CoordinatorResult<>(List.of("record#2"), "response2"),
             TXN_OFFSET_COMMIT_LATEST_VERSION
         );
 
@@ -3605,17 +3595,17 @@ public class CoordinatorRuntimeTest {
         // written to the log.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record#2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Set.of("record#1"), 
ctx.coordinator.coordinator().records());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), "record#1"),
             transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2")
         ), writer.entries(TP));
 
         // Write #3 with one record.
         CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20),
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record#3"), "response3")
+            state -> new CoordinatorResult<>(List.of("record#3"), "response3")
         );
 
         // Verify that the write is not committed yet.
@@ -3624,10 +3614,10 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 1L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record#2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Set.of("record#1", "record#3"), 
ctx.coordinator.coordinator().records());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), "record#1"),
             transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2")
         ), writer.entries(TP));
@@ -3649,10 +3639,10 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(4L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 1L, 2L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L, 1L, 2L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Set.of("record#1", "record#2", "record#3"), 
ctx.coordinator.coordinator().records());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), "record#1"),
             transactionalRecords(100L, (short) 50, 
timer.time().milliseconds(), "record#2"),
             records(timer.time().milliseconds(), "record#3"),
@@ -3710,7 +3700,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(ACTIVE, ctx.state);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Keep a reference to the current coordinator.
@@ -3823,7 +3813,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(1L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(2L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // Schedule a write operation that does not generate any records.
         CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
@@ -3838,7 +3828,7 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(2L, ctx.coordinator.lastWrittenOffset());
         assertEquals(2L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(2L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(2L), 
ctx.coordinator.snapshotRegistry().epochsList());
 
         // The write operation should be completed.
         assertEquals("response1", write.get(5, TimeUnit.SECONDS));
@@ -3871,7 +3861,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Get the max batch size.
@@ -3908,14 +3898,14 @@ public class CoordinatorRuntimeTest {
         // the first three records. The 4th one is pending.
         assertEquals(3L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
             new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             records(timer.time().milliseconds(), records.subList(0, 3))
         ), writer.entries(TP));
 
@@ -3932,14 +3922,14 @@ public class CoordinatorRuntimeTest {
         assertNull(ctx.currentBatch);
         assertEquals(4L, ctx.coordinator.lastWrittenOffset());
         assertEquals(3L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
             new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(timer.time().milliseconds() - 11, records.subList(0, 3)),
             records(timer.time().milliseconds() - 11, records.subList(3, 4))
         ), writer.entries(TP));
@@ -3979,7 +3969,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Get the max batch size.
@@ -4007,8 +3997,8 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
@@ -4020,7 +4010,7 @@ public class CoordinatorRuntimeTest {
         // Note that the batch will fail only when the batch is written 
because the
         // MemoryBatchBuilder always accept one record.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
-            state -> new 
CoordinatorResult<>(Collections.singletonList(record), "write#2", null, true, 
false)
+            state -> new CoordinatorResult<>(List.of(record), "write#2", null, 
true, false)
         );
 
         // Advance past the linger time to flush the pending batch.
@@ -4035,13 +4025,13 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(3L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L, 3L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             records(timer.time().milliseconds() - 11, records.subList(0, 3))
         ), writer.entries(TP));
     }
@@ -4074,7 +4064,7 @@ public class CoordinatorRuntimeTest {
         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertNull(ctx.currentBatch);
 
         // Get the max batch size.
@@ -4104,8 +4094,8 @@ public class CoordinatorRuntimeTest {
         // Verify the state.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
@@ -4127,7 +4117,7 @@ public class CoordinatorRuntimeTest {
         // Verify the state. The state should be reverted to the initial state.
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+        assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
         assertEquals(Collections.emptyList(), writer.entries(TP));
     }
@@ -4203,13 +4193,13 @@ public class CoordinatorRuntimeTest {
         // got flushed with all the records but the new one from #3.
         assertEquals(3L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
             new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             records(firstBatchTimestamp, records.subList(0, 3))
         ), writer.entries(TP));
         verify(runtimeMetrics, times(1)).recordFlushTime(10);
@@ -4220,13 +4210,13 @@ public class CoordinatorRuntimeTest {
         // Verify the state. The pending batch is flushed.
         assertEquals(4L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
             new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(secondBatchTimestamp, records.subList(0, 3)),
             records(secondBatchTimestamp, records.subList(3, 4))
         ), writer.entries(TP));
@@ -4273,11 +4263,11 @@ public class CoordinatorRuntimeTest {
 
         // write#1 will be committed and update the high watermark. Record 
time spent in purgatory.
         CompletableFuture<String> write1 = 
runtime.scheduleWriteOperation("write#1", TP, writeTimeout,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record1"), "response1")
+            state -> new CoordinatorResult<>(List.of("record1"), "response1")
         );
         // write#2 will time out sitting in the purgatory. Record time spent 
in purgatory.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, writeTimeout,
-            state -> new 
CoordinatorResult<>(Collections.singletonList("record2"), "response2")
+            state -> new CoordinatorResult<>(List.of("record2"), "response2")
         );
         // write#3 will error while appending. Does not spend time in 
purgatory.
         CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, writeTimeout,
@@ -4295,7 +4285,7 @@ public class CoordinatorRuntimeTest {
 
         // Records have been written to the log.
         long writeTimestamp = timer.time().milliseconds();
-        assertEquals(Arrays.asList(
+        assertEquals(List.of(
             records(writeTimestamp, "record1"),
             records(writeTimestamp, "record2")
         ), writer.entries(TP));

Reply via email to