This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a39fcac95c8 MINOR: Clean up coordinator-common and server modules 
(#19009)
a39fcac95c8 is described below

commit a39fcac95c82133ac6d9116216ae819d0bf9a6bd
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Thu Feb 27 14:39:21 2025 +0530

    MINOR: Clean up coordinator-common and server modules (#19009)
    
    Given that now we support Java 17 on our brokers, this PR replace the
    use of :
    
    Collections.singletonList() and Collections.emptyList() with List.of()
    Collections.singletonMap() and Collections.emptyMap() with Map.of()
    Collections.singleton() and Collections.emptySet() with Set.of()
    
    Affected modules: server and coordinator-common
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../common/runtime/CoordinatorRuntime.java         |  3 +-
 .../runtime/CoordinatorExecutorImplTest.java       | 12 ++--
 .../common/runtime/CoordinatorResultTest.java      | 10 +--
 .../common/runtime/CoordinatorRuntimeTest.java     | 76 +++++++++++-----------
 .../common/runtime/InMemoryPartitionWriter.java    |  2 +-
 .../common/runtime/KafkaMetricHistogramTest.java   | 12 ++--
 .../kafka/network/ConnectionQuotaEntity.java       |  7 +-
 .../kafka/network/metrics/RequestMetrics.java      |  3 +-
 .../apache/kafka/security/authorizer/AclEntry.java |  3 +-
 .../apache/kafka/server/AssignmentsManager.java    |  5 +-
 .../apache/kafka/server/ClientMetricsManager.java  |  4 +-
 .../kafka/server/metrics/BrokerServerMetrics.java  |  6 +-
 .../kafka/server/metrics/ClientMetricsConfigs.java |  7 +-
 .../kafka/server/share/context/FinalContext.java   |  4 +-
 .../server/share/context/ShareFetchContext.java    |  3 +-
 .../server/share/context/ShareSessionContext.java  | 13 ++--
 .../kafka/network/RequestConvertToJsonTest.java    |  5 +-
 .../kafka/server/AssignmentsManagerTest.java       | 11 ++--
 .../server/metrics/BrokerServerMetricsTest.java    | 13 ++--
 .../metrics/ClientMetricsInstanceMetadataTest.java | 27 ++++----
 .../share/session/ShareSessionCacheTest.java       |  6 +-
 .../server/share/session/ShareSessionTest.java     |  5 +-
 22 files changed, 114 insertions(+), 123 deletions(-)

diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1e9724a57aa..283a8df49ab 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -52,7 +52,6 @@ import org.slf4j.Logger;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -2534,7 +2533,7 @@ public class CoordinatorRuntime<S extends 
CoordinatorShard<U>, U> implements Aut
      */
     public List<TopicPartition> activeTopicPartitions() {
         if (coordinators == null || coordinators.isEmpty()) {
-            return Collections.emptyList();
+            return List.of();
         }
 
         return coordinators.entrySet().stream()
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
index d5ac1be7820..4f5e917f179 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.server.util.FutureUtils;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
-import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
@@ -71,7 +71,7 @@ public class CoordinatorExecutorImplTest {
             
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, 
String> op =
                 args.getArgument(3);
             assertEquals(
-                new CoordinatorResult<>(Collections.singletonList("record"), 
null),
+                new CoordinatorResult<>(List.of("record"), null),
                 op.generateRecordsAndResult(coordinatorShard)
             );
             return CompletableFuture.completedFuture(null);
@@ -95,7 +95,7 @@ public class CoordinatorExecutorImplTest {
             operationCalled.set(true);
             assertEquals("Hello!", result);
             assertNull(exception);
-            return new 
CoordinatorResult<>(Collections.singletonList("record"), null);
+            return new CoordinatorResult<>(List.of("record"), null);
         };
 
         executor.schedule(
@@ -130,7 +130,7 @@ public class CoordinatorExecutorImplTest {
             
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void, 
String> op =
                 args.getArgument(3);
             assertEquals(
-                new CoordinatorResult<>(Collections.emptyList(), null),
+                new CoordinatorResult<>(List.of(), null),
                 op.generateRecordsAndResult(coordinatorShard)
             );
             return CompletableFuture.completedFuture(null);
@@ -154,7 +154,7 @@ public class CoordinatorExecutorImplTest {
             assertNull(result);
             assertNotNull(exception);
             assertEquals("Oh no!", exception.getMessage());
-            return new CoordinatorResult<>(Collections.emptyList(), null);
+            return new CoordinatorResult<>(List.of(), null);
         };
 
         executor.schedule(
@@ -301,7 +301,7 @@ public class CoordinatorExecutorImplTest {
         AtomicBoolean operationCalled = new AtomicBoolean(false);
         CoordinatorExecutor.TaskOperation<String, String> taskOperation = 
(result, exception) -> {
             operationCalled.set(true);
-            return new CoordinatorResult<>(Collections.emptyList(), null);
+            return new CoordinatorResult<>(List.of(), null);
         };
 
         executor.schedule(
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorResultTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorResultTest.java
index 8d050cb1e07..263f14859ff 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorResultTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorResultTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -26,8 +26,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 public class CoordinatorResultTest {
     @Test
     public void testAttributes() {
-        CoordinatorResult<String, CoordinatorRecord> result = new 
CoordinatorResult<>(Collections.emptyList(), "response");
-        assertEquals(Collections.emptyList(), result.records());
+        CoordinatorResult<String, CoordinatorRecord> result = new 
CoordinatorResult<>(List.of(), "response");
+        assertEquals(List.of(), result.records());
         assertEquals("response", result.response());
     }
 
@@ -38,8 +38,8 @@ public class CoordinatorResultTest {
 
     @Test
     public void testEquals() {
-        CoordinatorResult<String, CoordinatorRecord> result1 = new 
CoordinatorResult<>(Collections.emptyList(), "response");
-        CoordinatorResult<String, CoordinatorRecord> result2 = new 
CoordinatorResult<>(Collections.emptyList(), "response");
+        CoordinatorResult<String, CoordinatorRecord> result1 = new 
CoordinatorResult<>(List.of(), "response");
+        CoordinatorResult<String, CoordinatorRecord> result2 = new 
CoordinatorResult<>(List.of(), "response");
         assertEquals(result1, result2);
     }
 }
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index c57bfcaae39..72f31b0a7d8 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -60,11 +60,11 @@ import java.nio.charset.Charset;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalInt;
 import java.util.Set;
@@ -237,7 +237,7 @@ public class CoordinatorRuntimeTest {
         }
 
         public MockCoordinatorLoader() {
-            this(null, Collections.emptyList(), Collections.emptyList());
+            this(null, List.of(), List.of());
         }
 
         @Override
@@ -448,7 +448,7 @@ public class CoordinatorRuntimeTest {
 
         Set<String> pendingRecords(long producerId) {
             TimelineHashSet<RecordAndMetadata> pending = 
pendingRecords.get(producerId);
-            if (pending == null) return Collections.emptySet();
+            if (pending == null) return Set.of();
             return pending.stream().map(record -> 
record.record).collect(Collectors.toUnmodifiableSet());
         }
 
@@ -1342,7 +1342,7 @@ public class CoordinatorRuntimeTest {
 
         // Write #3 but without any records.
         CompletableFuture<String> write3 = 
runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response3"));
+            state -> new CoordinatorResult<>(List.of(), "response3"));
 
         // Verify that the write is not committed yet.
         assertFalse(write3.isDone());
@@ -1385,7 +1385,7 @@ public class CoordinatorRuntimeTest {
 
         // Write #4 but without records.
         CompletableFuture<String> write4 = 
runtime.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response4"));
+            state -> new CoordinatorResult<>(List.of(), "response4"));
 
         // It is completed immediately because the state is fully committed.
         assertTrue(write4.isDone());
@@ -1414,7 +1414,7 @@ public class CoordinatorRuntimeTest {
         // Scheduling a write fails with a NotCoordinatorException because the 
coordinator
         // does not exist.
         CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT,
-            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response1"));
+            state -> new CoordinatorResult<>(List.of(), "response1"));
         assertFutureThrows(NotCoordinatorException.class, write);
     }
 
@@ -1696,7 +1696,7 @@ public class CoordinatorRuntimeTest {
         verify(writer, times(1)).registerListener(eq(TP), any());
 
         // Prepare the log config.
-        when(writer.config(TP)).thenReturn(new 
LogConfig(Collections.emptyMap()));
+        when(writer.config(TP)).thenReturn(new LogConfig(Map.of()));
 
         // Prepare the transaction verification.
         VerificationGuard guard = new VerificationGuard();
@@ -1910,7 +1910,7 @@ public class CoordinatorRuntimeTest {
             expectedType = ControlRecordType.COMMIT;
         } else {
             // Or they are gone if aborted.
-            assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+            assertEquals(Set.of(), ctx.coordinator.coordinator().records());
             expectedType = ControlRecordType.ABORT;
         }
 
@@ -2039,7 +2039,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
-        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+        assertEquals(Set.of(), ctx.coordinator.coordinator().records());
 
         // Complete transaction #1. It should fail.
         CompletableFuture<Void> complete1 = 
runtime.scheduleTransactionCompletion(
@@ -2058,7 +2058,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
-        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+        assertEquals(Set.of(), ctx.coordinator.coordinator().records());
     }
 
     @Test
@@ -2125,7 +2125,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
-        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+        assertEquals(Set.of(), ctx.coordinator.coordinator().records());
         assertEquals(List.of(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2")
         ), writer.entries(TP));
@@ -2147,7 +2147,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L, 2L), 
ctx.coordinator.snapshotRegistry().epochsList());
         assertEquals(Set.of("record1", "record2"), 
ctx.coordinator.coordinator().pendingRecords(100L));
-        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+        assertEquals(Set.of(), ctx.coordinator.coordinator().records());
         assertEquals(List.of(
             transactionalRecords(100L, (short) 5, timer.time().milliseconds(), 
"record1", "record2")
         ), writer.entries(TP));
@@ -2680,7 +2680,7 @@ public class CoordinatorRuntimeTest {
         assertTrue(processor.poll());
 
         // Verify that no operation was executed.
-        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().records());
+        assertEquals(Set.of(), ctx.coordinator.coordinator().records());
         assertEquals(0, ctx.timer.size());
     }
 
@@ -3010,8 +3010,8 @@ public class CoordinatorRuntimeTest {
                         startTimeMs + 500,
                         30,
                         3000),
-                    Collections.emptyList(),
-                    Collections.emptyList()))
+                    List.of(),
+                    List.of()))
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(supplier)
@@ -3127,8 +3127,8 @@ public class CoordinatorRuntimeTest {
                         1500,
                         30,
                         3000),
-                    Collections.emptyList(),
-                    Collections.emptyList()))
+                    List.of(),
+                    List.of()))
                 .withEventProcessor(new DirectEventProcessor())
                 .withPartitionWriter(writer)
                 .withCoordinatorShardBuilderSupplier(supplier)
@@ -3569,7 +3569,7 @@ public class CoordinatorRuntimeTest {
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), writer.entries(TP));
 
         // Write #2 with one record.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
@@ -3588,7 +3588,7 @@ public class CoordinatorRuntimeTest {
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), writer.entries(TP));
 
         // Write #3 with one record. This one cannot go into the existing batch
         // so the existing batch should be flushed and a new one should be 
created.
@@ -3758,7 +3758,7 @@ public class CoordinatorRuntimeTest {
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), writer.entries(TP));
 
         // Write #4. This write cannot make it in the current batch. So the 
current batch
         // is flushed. It will fail. So we expect all writes to fail.
@@ -3776,8 +3776,8 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(), writer.entries(TP));
     }
 
     @Test
@@ -3860,7 +3860,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(List.of(
             new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), writer.entries(TP));
 
         // Write #2. It should fail.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
@@ -3874,8 +3874,8 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(), writer.entries(TP));
     }
 
     @Test
@@ -3921,9 +3921,9 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(Set.of(), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Set.of("record#1"), 
ctx.coordinator.coordinator().records());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), writer.entries(TP));
 
         // Transactional write #2 with one record. This will flush the current 
batch.
         CompletableFuture<String> write2 = 
runtime.scheduleTransactionalWriteOperation(
@@ -3989,7 +3989,7 @@ public class CoordinatorRuntimeTest {
         assertEquals(4L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L, 1L, 2L, 3L, 4L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Collections.emptySet(), 
ctx.coordinator.coordinator().pendingRecords(100L));
+        assertEquals(Set.of(), 
ctx.coordinator.coordinator().pendingRecords(100L));
         assertEquals(Set.of("record#1", "record#2", "record#3"), 
ctx.coordinator.coordinator().records());
         assertEquals(List.of(
             records(timer.time().milliseconds(), "record#1"),
@@ -4168,7 +4168,7 @@ public class CoordinatorRuntimeTest {
 
         // Schedule a write operation that does not generate any records.
         CompletableFuture<String> write = 
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
-            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response1"));
+            state -> new CoordinatorResult<>(List.of(), "response1"));
 
         // The write operation should not be done.
         assertFalse(write.isDone());
@@ -4356,7 +4356,7 @@ public class CoordinatorRuntimeTest {
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), writer.entries(TP));
 
         // Let's write the 4th record which is too large. This will flush the 
current
         // pending batch, allocate a new batch, and put the record into it.
@@ -4454,7 +4454,7 @@ public class CoordinatorRuntimeTest {
             new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
             new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
         ), ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), writer.entries(TP));
 
         // Write #4. This write cannot make it in the current batch. So the 
current batch
         // is flushed. It will fail. So we expect all writes to fail.
@@ -4472,8 +4472,8 @@ public class CoordinatorRuntimeTest {
         assertEquals(0L, ctx.coordinator.lastWrittenOffset());
         assertEquals(0L, ctx.coordinator.lastCommittedOffset());
         assertEquals(List.of(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
-        assertEquals(Collections.emptyList(), 
ctx.coordinator.coordinator().fullRecords());
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
+        assertEquals(List.of(), writer.entries(TP));
     }
 
     @Test
@@ -4516,7 +4516,7 @@ public class CoordinatorRuntimeTest {
 
         // Write #2, with no records.
         CompletableFuture<String> write2 = 
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
-            state -> new CoordinatorResult<>(Collections.emptyList(), 
"response2"));
+            state -> new CoordinatorResult<>(List.of(), "response2"));
 
         // Write #2 should not be attached to the empty batch.
         assertTrue(write2.isDone());
@@ -4601,7 +4601,7 @@ public class CoordinatorRuntimeTest {
         );
 
         // Verify the state. Records are replayed but no batch written.
-        assertEquals(Collections.emptyList(), writer.entries(TP));
+        assertEquals(List.of(), writer.entries(TP));
         verify(runtimeMetrics, times(0)).recordFlushTime(10);
 
         // Write #3 with one record. This one cannot go into the existing batch
@@ -4779,7 +4779,7 @@ public class CoordinatorRuntimeTest {
 
         // Records have been written to the log.
         long writeTimestamp = timer.time().milliseconds();
-        assertEquals(Collections.singletonList(
+        assertEquals(List.of(
             records(writeTimestamp, "record1")
         ), writer.entries(TP));
 
@@ -4923,10 +4923,10 @@ public class CoordinatorRuntimeTest {
                     (result, exception) -> {
                         assertEquals("task result", result);
                         assertNull(exception);
-                        return new 
CoordinatorResult<>(Collections.singletonList("record2"), null);
+                        return new CoordinatorResult<>(List.of("record2"), 
null);
                     }
                 );
-                return new 
CoordinatorResult<>(Collections.singletonList("record1"), "response1");
+                return new CoordinatorResult<>(List.of("record1"), 
"response1");
             }
         );
 
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
index a8551f0734b..66cfbbe8b10 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
@@ -89,7 +89,7 @@ public class InMemoryPartitionWriter implements 
PartitionWriter {
 
     @Override
     public LogConfig config(TopicPartition tp) {
-        return new LogConfig(Collections.emptyMap());
+        return new LogConfig(Map.of());
     }
 
     @Override
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KafkaMetricHistogramTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KafkaMetricHistogramTest.java
index 72a4bce3fae..db3fb574448 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KafkaMetricHistogramTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KafkaMetricHistogramTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.metrics.Metrics;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -42,11 +42,11 @@ public class KafkaMetricHistogramTest {
             );
 
             Set<MetricName> expected = Set.of(
-                new MetricName("test-metric-max", "test-group", "test 
description", Collections.emptyMap()),
-                new MetricName("test-metric-p999", "test-group", "test 
description", Collections.emptyMap()),
-                new MetricName("test-metric-p99", "test-group", "test 
description", Collections.emptyMap()),
-                new MetricName("test-metric-p95", "test-group", "test 
description", Collections.emptyMap()),
-                new MetricName("test-metric-p50", "test-group", "test 
description", Collections.emptyMap())
+                new MetricName("test-metric-max", "test-group", "test 
description", Map.of()),
+                new MetricName("test-metric-p999", "test-group", "test 
description", Map.of()),
+                new MetricName("test-metric-p99", "test-group", "test 
description", Map.of()),
+                new MetricName("test-metric-p95", "test-group", "test 
description", Map.of()),
+                new MetricName("test-metric-p50", "test-group", "test 
description", Map.of())
             );
             Set<MetricName> actual = 
histogram.stats().stream().map(CompoundStat.NamedMeasurable::name).collect(Collectors.toSet());
             assertEquals(expected, actual);
diff --git 
a/server/src/main/java/org/apache/kafka/network/ConnectionQuotaEntity.java 
b/server/src/main/java/org/apache/kafka/network/ConnectionQuotaEntity.java
index 5a2fddd7f1c..4487f4ab530 100644
--- a/server/src/main/java/org/apache/kafka/network/ConnectionQuotaEntity.java
+++ b/server/src/main/java/org/apache/kafka/network/ConnectionQuotaEntity.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.network;
 
 import java.net.InetAddress;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -37,21 +36,21 @@ public class ConnectionQuotaEntity {
         return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" + 
listenerName,
                 CONNECTION_RATE_METRIC_NAME,
                 Long.MAX_VALUE,
-                Collections.singletonMap("listener", listenerName));
+                Map.of("listener", listenerName));
     }
 
     public static ConnectionQuotaEntity brokerQuotaEntity() {
         return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME,
                 "broker-" + ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME,
                 Long.MAX_VALUE,
-                Collections.emptyMap());
+                Map.of());
     }
 
     public static ConnectionQuotaEntity ipQuotaEntity(InetAddress ip) {
         return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" + 
ip.getHostAddress(),
                 CONNECTION_RATE_METRIC_NAME,
                 TimeUnit.HOURS.toSeconds(1),
-                Collections.singletonMap(IP_METRIC_TAG, ip.getHostAddress()));
+                Map.of(IP_METRIC_TAG, ip.getHostAddress()));
     }
 
     private final String sensorName;
diff --git 
a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java 
b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
index 991be1f59a6..bfdaad6d9d1 100644
--- a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
+++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
@@ -24,7 +24,6 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.Meter;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -87,7 +86,7 @@ public class RequestMetrics {
 
     public RequestMetrics(String name) {
         this.name = name;
-        tags = Collections.singletonMap("request", name);
+        tags = Map.of("request", name);
         // time a request spent in a request queue
         requestQueueTimeHist = 
metricsGroup.newHistogram(REQUEST_QUEUE_TIME_MS, true, tags);
         // time a request takes to be processed at the local broker
diff --git 
a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java 
b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
index 4b1b12aefdc..7aa0042b133 100644
--- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
+++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -62,7 +61,7 @@ public class AclEntry {
             case TRANSACTIONAL_ID:
                 return new HashSet<>(Arrays.asList(DESCRIBE, WRITE));
             case DELEGATION_TOKEN:
-                return Collections.singleton(DESCRIBE);
+                return Set.of(DESCRIBE);
             case USER:
                 return new HashSet<>(Arrays.asList(CREATE_TOKENS, 
DESCRIBE_TOKENS));
             default:
diff --git 
a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java 
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index 3605a175e08..b6223418ab5 100644
--- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -46,7 +46,6 @@ import com.yammer.metrics.core.MetricsRegistry;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -181,7 +180,7 @@ public final class AssignmentsManager {
         this.directoryIdToDescription = directoryIdToDescription;
         this.metadataImageSupplier = metadataImageSupplier;
         this.ready = new ConcurrentHashMap<>();
-        this.inflight = Collections.emptyMap();
+        this.inflight = Map.of();
         this.metricsRegistry = metricsRegistry;
         
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new 
Gauge<Integer>() {
                 @Override
@@ -363,7 +362,7 @@ public final class AssignmentsManager {
         Map<TopicIdPartition, Assignment> sent,
         Optional<ClientResponse> assignmentResponse
     ) {
-        inflight = Collections.emptyMap();
+        inflight = Map.of();
         Optional<String> globalResponseError = 
globalResponseError(assignmentResponse);
         if (globalResponseError.isPresent()) {
             previousGlobalFailures++;
diff --git 
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java 
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index 96eb4288dad..5b77004d0ae 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -596,7 +596,7 @@ public class ClientMetricsManager implements AutoCloseable {
             Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
                 ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST);
             unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new 
WindowedCount(),
-                ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, 
Collections.emptyMap()));
+                ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, Map.of()));
             sensorsName.add(unknownSubscriptionRequestCountSensor.name());
         }
 
@@ -607,7 +607,7 @@ public class ClientMetricsManager implements AutoCloseable {
                 return;
             }
 
-            Map<String, String> tags = 
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
clientInstanceId.toString());
+            Map<String, String> tags = 
Map.of(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString());
 
             Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE 
+ "-" + clientInstanceId);
             throttleCount.add(createMeter(metrics, new WindowedCount(), 
ClientMetricsStats.THROTTLE, tags));
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java 
b/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
index 617a1542864..7d135d703c2 100644
--- 
a/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
@@ -23,7 +23,7 @@ import org.apache.kafka.image.MetadataProvenance;
 
 import com.yammer.metrics.core.Histogram;
 
-import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -36,7 +36,7 @@ public final class BrokerServerMetrics implements 
AutoCloseable {
 
     private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.server", "BrokerMetadataListener");
     private final com.yammer.metrics.core.MetricName 
batchProcessingTimeHistName =
-            metricsGroup.metricName("MetadataBatchProcessingTimeUs", 
Collections.emptyMap());
+            metricsGroup.metricName("MetadataBatchProcessingTimeUs", Map.of());
 
     /**
      * A histogram tracking the time in microseconds it took to process 
batches of events.
@@ -45,7 +45,7 @@ public final class BrokerServerMetrics implements 
AutoCloseable {
             .newHistogram(batchProcessingTimeHistName, true);
 
     private final com.yammer.metrics.core.MetricName batchSizeHistName =
-            metricsGroup.metricName("MetadataBatchSizes", 
Collections.emptyMap());
+            metricsGroup.metricName("MetadataBatchSizes", Map.of());
 
     /**
      * A histogram tracking the sizes of batches that we have processed.
diff --git 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index 830bd29a243..ba94486e83c 100644
--- 
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
+++ 
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -24,7 +24,6 @@ import 
org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -99,9 +98,9 @@ public class ClientMetricsConfigs extends AbstractConfig {
     ));
 
     private static final ConfigDef CONFIG = new ConfigDef()
-        .define(SUBSCRIPTION_METRICS, Type.LIST, Collections.emptyList(), 
Importance.MEDIUM, "Subscription metrics list")
+        .define(SUBSCRIPTION_METRICS, Type.LIST, List.of(), Importance.MEDIUM, 
"Subscription metrics list")
         .define(PUSH_INTERVAL_MS, Type.INT, DEFAULT_INTERVAL_MS, 
Importance.MEDIUM, "Push interval in milliseconds")
-        .define(CLIENT_MATCH_PATTERN, Type.LIST, Collections.emptyList(), 
Importance.MEDIUM, "Client match pattern list");
+        .define(CLIENT_MATCH_PATTERN, Type.LIST, List.of(), Importance.MEDIUM, 
"Client match pattern list");
 
     public ClientMetricsConfigs(Properties props) {
         super(CONFIG, props);
@@ -165,7 +164,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
      */
     public static Map<String, Pattern> parseMatchingPatterns(List<String> 
patterns) {
         if (patterns == null || patterns.isEmpty()) {
-            return Collections.emptyMap();
+            return Map.of();
         }
 
         Map<String, Pattern> patternsMap = new HashMap<>();
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java 
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
index ab767cdb237..1be37ae87b7 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
@@ -27,8 +27,8 @@ import 
org.apache.kafka.server.share.ErroneousAndValidPartitionData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 
 /**
  * The share fetch context for a final share fetch request.
@@ -55,7 +55,7 @@ public class FinalContext extends ShareFetchContext {
                                                      
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
         log.debug("Final context returning {}", 
partitionsToLogString(updates.keySet()));
         return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
-                updates.entrySet().iterator(), Collections.emptyList()));
+                updates.entrySet().iterator(), List.of()));
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
index 2c472660d49..36062616cec 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
@@ -27,6 +27,7 @@ import org.apache.kafka.server.share.session.ShareSession;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.List;
 
 /**
  * The context for every share fetch request. The context is responsible for 
tracking the topic partitions present in
@@ -50,7 +51,7 @@ public abstract class ShareFetchContext {
      */
     public ShareFetchResponse throttleResponse(int throttleTimeMs) {
         return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                Collections.emptyIterator(), Collections.emptyList()));
+                Collections.emptyIterator(), List.of()));
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
index 1936ec1f0cc..18df489af33 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
@@ -103,7 +104,7 @@ public class ShareSessionContext extends ShareFetchContext {
     public ShareFetchResponse throttleResponse(int throttleTimeMs) {
         if (!isSubsequent) {
             return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                    Collections.emptyIterator(), Collections.emptyList()));
+                    Collections.emptyIterator(), List.of()));
         }
         int expectedEpoch = 
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
         int sessionEpoch;
@@ -114,10 +115,10 @@ public class ShareSessionContext extends 
ShareFetchContext {
             log.debug("Subsequent share session {} expected epoch {}, but got 
{}. " +
                     "Possible duplicate request.", session.key(), 
expectedEpoch, sessionEpoch);
             return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
-                    throttleTimeMs, Collections.emptyIterator(), 
Collections.emptyList()));
+                    throttleTimeMs, Collections.emptyIterator(), List.of()));
         }
         return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
-                Collections.emptyIterator(), Collections.emptyList()));
+                Collections.emptyIterator(), List.of()));
     }
 
     /**
@@ -196,7 +197,7 @@ public class ShareSessionContext extends ShareFetchContext {
                                                      
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
         if (!isSubsequent) {
             return new ShareFetchResponse(ShareFetchResponse.toMessage(
-                    Errors.NONE, 0, updates.entrySet().iterator(), 
Collections.emptyList()));
+                    Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
         } else {
             int expectedEpoch = 
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
             int sessionEpoch;
@@ -207,7 +208,7 @@ public class ShareSessionContext extends ShareFetchContext {
                 log.debug("Subsequent share session {} expected epoch {}, but 
got {}. Possible duplicate request.",
                         session.key(), expectedEpoch, sessionEpoch);
                 return new 
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
-                        0, Collections.emptyIterator(), 
Collections.emptyList()));
+                        0, Collections.emptyIterator(), List.of()));
             }
             // Iterate over the update list using PartitionIterator. This will 
prune updates which don't need to be sent
             Iterator<Map.Entry<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> partitionIterator = new 
PartitionIterator(
@@ -218,7 +219,7 @@ public class ShareSessionContext extends ShareFetchContext {
             log.debug("Subsequent share session context with session key {} 
returning {}", session.key(),
                     partitionsToLogString(updates.keySet()));
             return new ShareFetchResponse(ShareFetchResponse.toMessage(
-                    Errors.NONE, 0, updates.entrySet().iterator(), 
Collections.emptyList()));
+                    Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
         }
     }
 
diff --git 
a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java 
b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
index 3bcb5136699..701cfae1fa6 100644
--- 
a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
+++ 
b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
@@ -36,7 +36,6 @@ import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -70,7 +69,7 @@ public class RequestConvertToJsonTest {
                 }
             }
         }
-        assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled 
request keys");
+        assertEquals(List.of(), unhandledKeys, "Unhandled request keys");
     }
 
     @Test
@@ -116,7 +115,7 @@ public class RequestConvertToJsonTest {
                 }
             }
         }
-        assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled 
response keys");
+        assertEquals(List.of(), unhandledKeys, "Unhandled response keys");
     }
 
     @Test
diff --git 
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index f426c355ceb..34af19cfa37 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -58,7 +58,6 @@ import org.slf4j.LoggerFactory;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -490,25 +489,25 @@ public class AssignmentsManagerTest {
                     setTopics(Arrays.asList(
                         new AssignReplicasToDirsRequestData.TopicData().
                             setTopicId(TOPIC_1).
-                            setPartitions(Collections.singletonList(
+                            setPartitions(List.of(
                                 new 
AssignReplicasToDirsRequestData.PartitionData().
                                     setPartitionIndex(2))),
                         new AssignReplicasToDirsRequestData.TopicData().
                             setTopicId(TOPIC_2).
-                            setPartitions(Collections.singletonList(
+                            setPartitions(List.of(
                                 new 
AssignReplicasToDirsRequestData.PartitionData().
                                     setPartitionIndex(5))))),
                 new AssignReplicasToDirsRequestData.DirectoryData().
                     setId(DIR_3).
-                    setTopics(Collections.singletonList(
+                    setTopics(List.of(
                         new AssignReplicasToDirsRequestData.TopicData().
                             setTopicId(TOPIC_1).
-                            setPartitions(Collections.singletonList(
+                            setPartitions(List.of(
                                 new 
AssignReplicasToDirsRequestData.PartitionData().
                                     setPartitionIndex(3))))),
                 new AssignReplicasToDirsRequestData.DirectoryData().
                     setId(DIR_1).
-                    setTopics(Collections.singletonList(
+                    setTopics(List.of(
                         new AssignReplicasToDirsRequestData.TopicData().
                             setTopicId(TOPIC_1).
                             setPartitions(Arrays.asList(
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
 
b/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
index 6b5c84c8aea..20fa408d358 100644
--- 
a/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.image.MetadataProvenance;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
@@ -43,12 +42,12 @@ public final class BrokerServerMetricsTest {
 
         // Metric description is not use for metric name equality
         Set<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
-                new MetricName("last-applied-record-offset", expectedGroup, 
"", Collections.emptyMap()),
-                new MetricName("last-applied-record-timestamp", expectedGroup, 
"", Collections.emptyMap()),
-                new MetricName("last-applied-record-lag-ms", expectedGroup, 
"", Collections.emptyMap()),
-                new MetricName("metadata-load-error-count", expectedGroup, "", 
Collections.emptyMap()),
-                new MetricName("metadata-apply-error-count", expectedGroup, 
"", Collections.emptyMap()),
-                new MetricName("ignored-static-voters", expectedGroup, "", 
Collections.emptyMap())
+                new MetricName("last-applied-record-offset", expectedGroup, 
"", Map.of()),
+                new MetricName("last-applied-record-timestamp", expectedGroup, 
"", Map.of()),
+                new MetricName("last-applied-record-lag-ms", expectedGroup, 
"", Map.of()),
+                new MetricName("metadata-load-error-count", expectedGroup, "", 
Map.of()),
+                new MetricName("metadata-apply-error-count", expectedGroup, 
"", Map.of()),
+                new MetricName("ignored-static-voters", expectedGroup, "", 
Map.of())
         ));
 
         try (BrokerServerMetrics ignored = new BrokerServerMetrics(metrics)) {
diff --git 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java
 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java
index 8bf22e7f90f..4bf26ba55ca 100644
--- 
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java
@@ -22,7 +22,6 @@ import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -37,25 +36,25 @@ public class ClientMetricsInstanceMetadataTest {
         Uuid uuid = Uuid.randomUuid();
         ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(uuid, ClientMetricsTestUtils.requestContext());
         // We consider empty/missing client matching patterns as valid
-        assertTrue(instanceMetadata.isMatch(Collections.emptyMap()));
+        assertTrue(instanceMetadata.isMatch(Map.of()));
 
         assertTrue(instanceMetadata.isMatch(
-            Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, 
Pattern.compile(".*"))));
+            Map.of(ClientMetricsConfigs.CLIENT_ID, Pattern.compile(".*"))));
         assertTrue(instanceMetadata.isMatch(
-            Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, 
Pattern.compile("producer-1"))));
+            Map.of(ClientMetricsConfigs.CLIENT_ID, 
Pattern.compile("producer-1"))));
         assertTrue(instanceMetadata.isMatch(
-            Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID, 
Pattern.compile("producer.*"))));
+            Map.of(ClientMetricsConfigs.CLIENT_ID, 
Pattern.compile("producer.*"))));
         assertTrue(instanceMetadata.isMatch(
-            Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
Pattern.compile(uuid.toString()))));
+            Map.of(ClientMetricsConfigs.CLIENT_INSTANCE_ID, 
Pattern.compile(uuid.toString()))));
         assertTrue(instanceMetadata.isMatch(
-            
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, 
Pattern.compile("apache-kafka-java"))));
+            Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, 
Pattern.compile("apache-kafka-java"))));
         assertTrue(instanceMetadata.isMatch(
-            
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, 
Pattern.compile("3.5.2"))));
+            Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, 
Pattern.compile("3.5.2"))));
         assertTrue(instanceMetadata.isMatch(
-            
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, 
Pattern.compile(
+            Map.of(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, Pattern.compile(
                 InetAddress.getLocalHost().getHostAddress()))));
         assertTrue(instanceMetadata.isMatch(
-            Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOURCE_PORT, 
Pattern.compile(
+            Map.of(ClientMetricsConfigs.CLIENT_SOURCE_PORT, Pattern.compile(
                 String.valueOf(ClientMetricsTestUtils.CLIENT_PORT)))));
     }
 
@@ -125,9 +124,9 @@ public class ClientMetricsInstanceMetadataTest {
             ClientMetricsTestUtils.requestContext());
 
         // Unknown key in pattern map
-        
assertFalse(instanceMetadata.isMatch(Collections.singletonMap("unknown", 
Pattern.compile(".*"))));
+        assertFalse(instanceMetadata.isMatch(Map.of("unknown", 
Pattern.compile(".*"))));
         // '*' key is considered as invalid regex pattern
-        assertFalse(instanceMetadata.isMatch(Collections.singletonMap("*", 
Pattern.compile(".*"))));
+        assertFalse(instanceMetadata.isMatch(Map.of("*", 
Pattern.compile(".*"))));
     }
 
     @Test
@@ -136,9 +135,9 @@ public class ClientMetricsInstanceMetadataTest {
         ClientMetricsInstanceMetadata instanceMetadata = new 
ClientMetricsInstanceMetadata(uuid,
             ClientMetricsTestUtils.requestContextWithNullClientInfo());
 
-        
assertFalse(instanceMetadata.isMatch(Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME,
+        
assertFalse(instanceMetadata.isMatch(Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME,
             Pattern.compile(".*"))));
-        
assertFalse(instanceMetadata.isMatch(Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION,
+        
assertFalse(instanceMetadata.isMatch(Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION,
             Pattern.compile(".*"))));
     }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index 7b8d42cd859..d5d42c5a3ec 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -24,8 +24,8 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -61,7 +61,7 @@ public class ShareSessionCacheTest {
         assertEquals(0, cache.totalPartitions());
         ShareSessionKey key1 = cache.maybeCreateSession("grp", 
Uuid.randomUuid(), 0, mockedSharePartitionMap(2));
         assertNotNull(key1);
-        assertShareCacheContains(cache, new 
ArrayList<>(Collections.singletonList(key1)));
+        assertShareCacheContains(cache, new ArrayList<>(List.of(key1)));
         ShareSession session1 = cache.get(key1);
         assertEquals(2, session1.size());
         assertEquals(2, cache.totalPartitions());
@@ -82,7 +82,7 @@ public class ShareSessionCacheTest {
         assertEquals(6, cache.totalPartitions());
         assertEquals(2, cache.size());
         cache.remove(key1);
-        assertShareCacheContains(cache, new 
ArrayList<>(Collections.singletonList(key2)));
+        assertShareCacheContains(cache, new ArrayList<>(List.of(key2)));
         assertEquals(1, cache.size());
         assertEquals(4, cache.totalPartitions());
 
diff --git 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
index a9022bb2fea..6a30ac97ce4 100644
--- 
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.Uuid;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,10 +45,10 @@ public class ShareSessionTest {
 
     @Test
     public void testPartitionsToLogStringEmpty() {
-        String response = 
ShareSession.partitionsToLogString(Collections.emptyList(), false);
+        String response = ShareSession.partitionsToLogString(List.of(), false);
         assertEquals("0 partition(s)", response);
 
-        response = ShareSession.partitionsToLogString(Collections.emptyList(), 
true);
+        response = ShareSession.partitionsToLogString(List.of(), true);
         assertEquals("( [] )", response);
     }
 }

Reply via email to