This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a39fcac95c8 MINOR: Clean up coordinator-common and server modules
(#19009)
a39fcac95c8 is described below
commit a39fcac95c82133ac6d9116216ae819d0bf9a6bd
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Thu Feb 27 14:39:21 2025 +0530
MINOR: Clean up coordinator-common and server modules (#19009)
Given that now we support Java 17 on our brokers, this PR replace the
use of :
Collections.singletonList() and Collections.emptyList() with List.of()
Collections.singletonMap() and Collections.emptyMap() with Map.of()
Collections.singleton() and Collections.emptySet() with Set.of()
Affected modules: server and coordinator-common
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../common/runtime/CoordinatorRuntime.java | 3 +-
.../runtime/CoordinatorExecutorImplTest.java | 12 ++--
.../common/runtime/CoordinatorResultTest.java | 10 +--
.../common/runtime/CoordinatorRuntimeTest.java | 76 +++++++++++-----------
.../common/runtime/InMemoryPartitionWriter.java | 2 +-
.../common/runtime/KafkaMetricHistogramTest.java | 12 ++--
.../kafka/network/ConnectionQuotaEntity.java | 7 +-
.../kafka/network/metrics/RequestMetrics.java | 3 +-
.../apache/kafka/security/authorizer/AclEntry.java | 3 +-
.../apache/kafka/server/AssignmentsManager.java | 5 +-
.../apache/kafka/server/ClientMetricsManager.java | 4 +-
.../kafka/server/metrics/BrokerServerMetrics.java | 6 +-
.../kafka/server/metrics/ClientMetricsConfigs.java | 7 +-
.../kafka/server/share/context/FinalContext.java | 4 +-
.../server/share/context/ShareFetchContext.java | 3 +-
.../server/share/context/ShareSessionContext.java | 13 ++--
.../kafka/network/RequestConvertToJsonTest.java | 5 +-
.../kafka/server/AssignmentsManagerTest.java | 11 ++--
.../server/metrics/BrokerServerMetricsTest.java | 13 ++--
.../metrics/ClientMetricsInstanceMetadataTest.java | 27 ++++----
.../share/session/ShareSessionCacheTest.java | 6 +-
.../server/share/session/ShareSessionTest.java | 5 +-
22 files changed, 114 insertions(+), 123 deletions(-)
diff --git
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
index 1e9724a57aa..283a8df49ab 100644
---
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
+++
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java
@@ -52,7 +52,6 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -2534,7 +2533,7 @@ public class CoordinatorRuntime<S extends
CoordinatorShard<U>, U> implements Aut
*/
public List<TopicPartition> activeTopicPartitions() {
if (coordinators == null || coordinators.isEmpty()) {
- return Collections.emptyList();
+ return List.of();
}
return coordinators.entrySet().stream()
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
index d5ac1be7820..4f5e917f179 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Test;
import java.time.Duration;
-import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -71,7 +71,7 @@ public class CoordinatorExecutorImplTest {
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void,
String> op =
args.getArgument(3);
assertEquals(
- new CoordinatorResult<>(Collections.singletonList("record"),
null),
+ new CoordinatorResult<>(List.of("record"), null),
op.generateRecordsAndResult(coordinatorShard)
);
return CompletableFuture.completedFuture(null);
@@ -95,7 +95,7 @@ public class CoordinatorExecutorImplTest {
operationCalled.set(true);
assertEquals("Hello!", result);
assertNull(exception);
- return new
CoordinatorResult<>(Collections.singletonList("record"), null);
+ return new CoordinatorResult<>(List.of("record"), null);
};
executor.schedule(
@@ -130,7 +130,7 @@ public class CoordinatorExecutorImplTest {
CoordinatorRuntime.CoordinatorWriteOperation<CoordinatorShard<String>, Void,
String> op =
args.getArgument(3);
assertEquals(
- new CoordinatorResult<>(Collections.emptyList(), null),
+ new CoordinatorResult<>(List.of(), null),
op.generateRecordsAndResult(coordinatorShard)
);
return CompletableFuture.completedFuture(null);
@@ -154,7 +154,7 @@ public class CoordinatorExecutorImplTest {
assertNull(result);
assertNotNull(exception);
assertEquals("Oh no!", exception.getMessage());
- return new CoordinatorResult<>(Collections.emptyList(), null);
+ return new CoordinatorResult<>(List.of(), null);
};
executor.schedule(
@@ -301,7 +301,7 @@ public class CoordinatorExecutorImplTest {
AtomicBoolean operationCalled = new AtomicBoolean(false);
CoordinatorExecutor.TaskOperation<String, String> taskOperation =
(result, exception) -> {
operationCalled.set(true);
- return new CoordinatorResult<>(Collections.emptyList(), null);
+ return new CoordinatorResult<>(List.of(), null);
};
executor.schedule(
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorResultTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorResultTest.java
index 8d050cb1e07..263f14859ff 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorResultTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorResultTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -26,8 +26,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class CoordinatorResultTest {
@Test
public void testAttributes() {
- CoordinatorResult<String, CoordinatorRecord> result = new
CoordinatorResult<>(Collections.emptyList(), "response");
- assertEquals(Collections.emptyList(), result.records());
+ CoordinatorResult<String, CoordinatorRecord> result = new
CoordinatorResult<>(List.of(), "response");
+ assertEquals(List.of(), result.records());
assertEquals("response", result.response());
}
@@ -38,8 +38,8 @@ public class CoordinatorResultTest {
@Test
public void testEquals() {
- CoordinatorResult<String, CoordinatorRecord> result1 = new
CoordinatorResult<>(Collections.emptyList(), "response");
- CoordinatorResult<String, CoordinatorRecord> result2 = new
CoordinatorResult<>(Collections.emptyList(), "response");
+ CoordinatorResult<String, CoordinatorRecord> result1 = new
CoordinatorResult<>(List.of(), "response");
+ CoordinatorResult<String, CoordinatorRecord> result2 = new
CoordinatorResult<>(List.of(), "response");
assertEquals(result1, result2);
}
}
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
index c57bfcaae39..72f31b0a7d8 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java
@@ -60,11 +60,11 @@ import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
@@ -237,7 +237,7 @@ public class CoordinatorRuntimeTest {
}
public MockCoordinatorLoader() {
- this(null, Collections.emptyList(), Collections.emptyList());
+ this(null, List.of(), List.of());
}
@Override
@@ -448,7 +448,7 @@ public class CoordinatorRuntimeTest {
Set<String> pendingRecords(long producerId) {
TimelineHashSet<RecordAndMetadata> pending =
pendingRecords.get(producerId);
- if (pending == null) return Collections.emptySet();
+ if (pending == null) return Set.of();
return pending.stream().map(record ->
record.record).collect(Collectors.toUnmodifiableSet());
}
@@ -1342,7 +1342,7 @@ public class CoordinatorRuntimeTest {
// Write #3 but without any records.
CompletableFuture<String> write3 =
runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
- state -> new CoordinatorResult<>(Collections.emptyList(),
"response3"));
+ state -> new CoordinatorResult<>(List.of(), "response3"));
// Verify that the write is not committed yet.
assertFalse(write3.isDone());
@@ -1385,7 +1385,7 @@ public class CoordinatorRuntimeTest {
// Write #4 but without records.
CompletableFuture<String> write4 =
runtime.scheduleWriteOperation("write#4", TP, DEFAULT_WRITE_TIMEOUT,
- state -> new CoordinatorResult<>(Collections.emptyList(),
"response4"));
+ state -> new CoordinatorResult<>(List.of(), "response4"));
// It is completed immediately because the state is fully committed.
assertTrue(write4.isDone());
@@ -1414,7 +1414,7 @@ public class CoordinatorRuntimeTest {
// Scheduling a write fails with a NotCoordinatorException because the
coordinator
// does not exist.
CompletableFuture<String> write =
runtime.scheduleWriteOperation("write", TP, DEFAULT_WRITE_TIMEOUT,
- state -> new CoordinatorResult<>(Collections.emptyList(),
"response1"));
+ state -> new CoordinatorResult<>(List.of(), "response1"));
assertFutureThrows(NotCoordinatorException.class, write);
}
@@ -1696,7 +1696,7 @@ public class CoordinatorRuntimeTest {
verify(writer, times(1)).registerListener(eq(TP), any());
// Prepare the log config.
- when(writer.config(TP)).thenReturn(new
LogConfig(Collections.emptyMap()));
+ when(writer.config(TP)).thenReturn(new LogConfig(Map.of()));
// Prepare the transaction verification.
VerificationGuard guard = new VerificationGuard();
@@ -1910,7 +1910,7 @@ public class CoordinatorRuntimeTest {
expectedType = ControlRecordType.COMMIT;
} else {
// Or they are gone if aborted.
- assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().records());
+ assertEquals(Set.of(), ctx.coordinator.coordinator().records());
expectedType = ControlRecordType.ABORT;
}
@@ -2039,7 +2039,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 2L),
ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"),
ctx.coordinator.coordinator().pendingRecords(100L));
- assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().records());
+ assertEquals(Set.of(), ctx.coordinator.coordinator().records());
// Complete transaction #1. It should fail.
CompletableFuture<Void> complete1 =
runtime.scheduleTransactionCompletion(
@@ -2058,7 +2058,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 2L),
ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"),
ctx.coordinator.coordinator().pendingRecords(100L));
- assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().records());
+ assertEquals(Set.of(), ctx.coordinator.coordinator().records());
}
@Test
@@ -2125,7 +2125,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 2L),
ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"),
ctx.coordinator.coordinator().pendingRecords(100L));
- assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().records());
+ assertEquals(Set.of(), ctx.coordinator.coordinator().records());
assertEquals(List.of(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(),
"record1", "record2")
), writer.entries(TP));
@@ -2147,7 +2147,7 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 2L),
ctx.coordinator.snapshotRegistry().epochsList());
assertEquals(Set.of("record1", "record2"),
ctx.coordinator.coordinator().pendingRecords(100L));
- assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().records());
+ assertEquals(Set.of(), ctx.coordinator.coordinator().records());
assertEquals(List.of(
transactionalRecords(100L, (short) 5, timer.time().milliseconds(),
"record1", "record2")
), writer.entries(TP));
@@ -2680,7 +2680,7 @@ public class CoordinatorRuntimeTest {
assertTrue(processor.poll());
// Verify that no operation was executed.
- assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().records());
+ assertEquals(Set.of(), ctx.coordinator.coordinator().records());
assertEquals(0, ctx.timer.size());
}
@@ -3010,8 +3010,8 @@ public class CoordinatorRuntimeTest {
startTimeMs + 500,
30,
3000),
- Collections.emptyList(),
- Collections.emptyList()))
+ List.of(),
+ List.of()))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
@@ -3127,8 +3127,8 @@ public class CoordinatorRuntimeTest {
1500,
30,
3000),
- Collections.emptyList(),
- Collections.emptyList()))
+ List.of(),
+ List.of()))
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
@@ -3569,7 +3569,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
), ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), writer.entries(TP));
// Write #2 with one record.
CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
@@ -3588,7 +3588,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), writer.entries(TP));
// Write #3 with one record. This one cannot go into the existing batch
// so the existing batch should be flushed and a new one should be
created.
@@ -3758,7 +3758,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), writer.entries(TP));
// Write #4. This write cannot make it in the current batch. So the
current batch
// is flushed. It will fail. So we expect all writes to fail.
@@ -3776,8 +3776,8 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
- assertEquals(Collections.emptyList(),
ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(), writer.entries(TP));
}
@Test
@@ -3860,7 +3860,7 @@ public class CoordinatorRuntimeTest {
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0))
), ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), writer.entries(TP));
// Write #2. It should fail.
CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
@@ -3874,8 +3874,8 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
- assertEquals(Collections.emptyList(),
ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(), writer.entries(TP));
}
@Test
@@ -3921,9 +3921,9 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
- assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(Set.of(),
ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of("record#1"),
ctx.coordinator.coordinator().records());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), writer.entries(TP));
// Transactional write #2 with one record. This will flush the current
batch.
CompletableFuture<String> write2 =
runtime.scheduleTransactionalWriteOperation(
@@ -3989,7 +3989,7 @@ public class CoordinatorRuntimeTest {
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L, 1L, 2L, 3L, 4L),
ctx.coordinator.snapshotRegistry().epochsList());
- assertEquals(Collections.emptySet(),
ctx.coordinator.coordinator().pendingRecords(100L));
+ assertEquals(Set.of(),
ctx.coordinator.coordinator().pendingRecords(100L));
assertEquals(Set.of("record#1", "record#2", "record#3"),
ctx.coordinator.coordinator().records());
assertEquals(List.of(
records(timer.time().milliseconds(), "record#1"),
@@ -4168,7 +4168,7 @@ public class CoordinatorRuntimeTest {
// Schedule a write operation that does not generate any records.
CompletableFuture<String> write =
runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
- state -> new CoordinatorResult<>(Collections.emptyList(),
"response1"));
+ state -> new CoordinatorResult<>(List.of(), "response1"));
// The write operation should not be done.
assertFalse(write.isDone());
@@ -4356,7 +4356,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), writer.entries(TP));
// Let's write the 4th record which is too large. This will flush the
current
// pending batch, allocate a new batch, and put the record into it.
@@ -4454,7 +4454,7 @@ public class CoordinatorRuntimeTest {
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2))
), ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), writer.entries(TP));
// Write #4. This write cannot make it in the current batch. So the
current batch
// is flushed. It will fail. So we expect all writes to fail.
@@ -4472,8 +4472,8 @@ public class CoordinatorRuntimeTest {
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L),
ctx.coordinator.snapshotRegistry().epochsList());
- assertEquals(Collections.emptyList(),
ctx.coordinator.coordinator().fullRecords());
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), ctx.coordinator.coordinator().fullRecords());
+ assertEquals(List.of(), writer.entries(TP));
}
@Test
@@ -4516,7 +4516,7 @@ public class CoordinatorRuntimeTest {
// Write #2, with no records.
CompletableFuture<String> write2 =
runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
- state -> new CoordinatorResult<>(Collections.emptyList(),
"response2"));
+ state -> new CoordinatorResult<>(List.of(), "response2"));
// Write #2 should not be attached to the empty batch.
assertTrue(write2.isDone());
@@ -4601,7 +4601,7 @@ public class CoordinatorRuntimeTest {
);
// Verify the state. Records are replayed but no batch written.
- assertEquals(Collections.emptyList(), writer.entries(TP));
+ assertEquals(List.of(), writer.entries(TP));
verify(runtimeMetrics, times(0)).recordFlushTime(10);
// Write #3 with one record. This one cannot go into the existing batch
@@ -4779,7 +4779,7 @@ public class CoordinatorRuntimeTest {
// Records have been written to the log.
long writeTimestamp = timer.time().milliseconds();
- assertEquals(Collections.singletonList(
+ assertEquals(List.of(
records(writeTimestamp, "record1")
), writer.entries(TP));
@@ -4923,10 +4923,10 @@ public class CoordinatorRuntimeTest {
(result, exception) -> {
assertEquals("task result", result);
assertNull(exception);
- return new
CoordinatorResult<>(Collections.singletonList("record2"), null);
+ return new CoordinatorResult<>(List.of("record2"),
null);
}
);
- return new
CoordinatorResult<>(Collections.singletonList("record1"), "response1");
+ return new CoordinatorResult<>(List.of("record1"),
"response1");
}
);
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
index a8551f0734b..66cfbbe8b10 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java
@@ -89,7 +89,7 @@ public class InMemoryPartitionWriter implements
PartitionWriter {
@Override
public LogConfig config(TopicPartition tp) {
- return new LogConfig(Collections.emptyMap());
+ return new LogConfig(Map.of());
}
@Override
diff --git
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KafkaMetricHistogramTest.java
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KafkaMetricHistogramTest.java
index 72a4bce3fae..db3fb574448 100644
---
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KafkaMetricHistogramTest.java
+++
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/KafkaMetricHistogramTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,11 +42,11 @@ public class KafkaMetricHistogramTest {
);
Set<MetricName> expected = Set.of(
- new MetricName("test-metric-max", "test-group", "test
description", Collections.emptyMap()),
- new MetricName("test-metric-p999", "test-group", "test
description", Collections.emptyMap()),
- new MetricName("test-metric-p99", "test-group", "test
description", Collections.emptyMap()),
- new MetricName("test-metric-p95", "test-group", "test
description", Collections.emptyMap()),
- new MetricName("test-metric-p50", "test-group", "test
description", Collections.emptyMap())
+ new MetricName("test-metric-max", "test-group", "test
description", Map.of()),
+ new MetricName("test-metric-p999", "test-group", "test
description", Map.of()),
+ new MetricName("test-metric-p99", "test-group", "test
description", Map.of()),
+ new MetricName("test-metric-p95", "test-group", "test
description", Map.of()),
+ new MetricName("test-metric-p50", "test-group", "test
description", Map.of())
);
Set<MetricName> actual =
histogram.stats().stream().map(CompoundStat.NamedMeasurable::name).collect(Collectors.toSet());
assertEquals(expected, actual);
diff --git
a/server/src/main/java/org/apache/kafka/network/ConnectionQuotaEntity.java
b/server/src/main/java/org/apache/kafka/network/ConnectionQuotaEntity.java
index 5a2fddd7f1c..4487f4ab530 100644
--- a/server/src/main/java/org/apache/kafka/network/ConnectionQuotaEntity.java
+++ b/server/src/main/java/org/apache/kafka/network/ConnectionQuotaEntity.java
@@ -17,7 +17,6 @@
package org.apache.kafka.network;
import java.net.InetAddress;
-import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -37,21 +36,21 @@ public class ConnectionQuotaEntity {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" +
listenerName,
CONNECTION_RATE_METRIC_NAME,
Long.MAX_VALUE,
- Collections.singletonMap("listener", listenerName));
+ Map.of("listener", listenerName));
}
public static ConnectionQuotaEntity brokerQuotaEntity() {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME,
"broker-" + ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME,
Long.MAX_VALUE,
- Collections.emptyMap());
+ Map.of());
}
public static ConnectionQuotaEntity ipQuotaEntity(InetAddress ip) {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" +
ip.getHostAddress(),
CONNECTION_RATE_METRIC_NAME,
TimeUnit.HOURS.toSeconds(1),
- Collections.singletonMap(IP_METRIC_TAG, ip.getHostAddress()));
+ Map.of(IP_METRIC_TAG, ip.getHostAddress()));
}
private final String sensorName;
diff --git
a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
index 991be1f59a6..bfdaad6d9d1 100644
--- a/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
+++ b/server/src/main/java/org/apache/kafka/network/metrics/RequestMetrics.java
@@ -24,7 +24,6 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -87,7 +86,7 @@ public class RequestMetrics {
public RequestMetrics(String name) {
this.name = name;
- tags = Collections.singletonMap("request", name);
+ tags = Map.of("request", name);
// time a request spent in a request queue
requestQueueTimeHist =
metricsGroup.newHistogram(REQUEST_QUEUE_TIME_MS, true, tags);
// time a request takes to be processed at the local broker
diff --git
a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
index 4b1b12aefdc..7aa0042b133 100644
--- a/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
+++ b/server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -62,7 +61,7 @@ public class AclEntry {
case TRANSACTIONAL_ID:
return new HashSet<>(Arrays.asList(DESCRIBE, WRITE));
case DELEGATION_TOKEN:
- return Collections.singleton(DESCRIBE);
+ return Set.of(DESCRIBE);
case USER:
return new HashSet<>(Arrays.asList(CREATE_TOKENS,
DESCRIBE_TOKENS));
default:
diff --git
a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
index 3605a175e08..b6223418ab5 100644
--- a/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/AssignmentsManager.java
@@ -46,7 +46,6 @@ import com.yammer.metrics.core.MetricsRegistry;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -181,7 +180,7 @@ public final class AssignmentsManager {
this.directoryIdToDescription = directoryIdToDescription;
this.metadataImageSupplier = metadataImageSupplier;
this.ready = new ConcurrentHashMap<>();
- this.inflight = Collections.emptyMap();
+ this.inflight = Map.of();
this.metricsRegistry = metricsRegistry;
this.metricsRegistry.newGauge(QUEUED_REPLICA_TO_DIR_ASSIGNMENTS_METRIC, new
Gauge<Integer>() {
@Override
@@ -363,7 +362,7 @@ public final class AssignmentsManager {
Map<TopicIdPartition, Assignment> sent,
Optional<ClientResponse> assignmentResponse
) {
- inflight = Collections.emptyMap();
+ inflight = Map.of();
Optional<String> globalResponseError =
globalResponseError(assignmentResponse);
if (globalResponseError.isPresent()) {
previousGlobalFailures++;
diff --git
a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index 96eb4288dad..5b77004d0ae 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -596,7 +596,7 @@ public class ClientMetricsManager implements AutoCloseable {
Sensor unknownSubscriptionRequestCountSensor = metrics.sensor(
ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST);
unknownSubscriptionRequestCountSensor.add(createMeter(metrics, new
WindowedCount(),
- ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST,
Collections.emptyMap()));
+ ClientMetricsStats.UNKNOWN_SUBSCRIPTION_REQUEST, Map.of()));
sensorsName.add(unknownSubscriptionRequestCountSensor.name());
}
@@ -607,7 +607,7 @@ public class ClientMetricsManager implements AutoCloseable {
return;
}
- Map<String, String> tags =
Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID,
clientInstanceId.toString());
+ Map<String, String> tags =
Map.of(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString());
Sensor throttleCount = metrics.sensor(ClientMetricsStats.THROTTLE
+ "-" + clientInstanceId);
throttleCount.add(createMeter(metrics, new WindowedCount(),
ClientMetricsStats.THROTTLE, tags));
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
b/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
index 617a1542864..7d135d703c2 100644
---
a/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
+++
b/server/src/main/java/org/apache/kafka/server/metrics/BrokerServerMetrics.java
@@ -23,7 +23,7 @@ import org.apache.kafka.image.MetadataProvenance;
import com.yammer.metrics.core.Histogram;
-import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -36,7 +36,7 @@ public final class BrokerServerMetrics implements
AutoCloseable {
private final KafkaMetricsGroup metricsGroup = new
KafkaMetricsGroup("kafka.server", "BrokerMetadataListener");
private final com.yammer.metrics.core.MetricName
batchProcessingTimeHistName =
- metricsGroup.metricName("MetadataBatchProcessingTimeUs",
Collections.emptyMap());
+ metricsGroup.metricName("MetadataBatchProcessingTimeUs", Map.of());
/**
* A histogram tracking the time in microseconds it took to process
batches of events.
@@ -45,7 +45,7 @@ public final class BrokerServerMetrics implements
AutoCloseable {
.newHistogram(batchProcessingTimeHistName, true);
private final com.yammer.metrics.core.MetricName batchSizeHistName =
- metricsGroup.metricName("MetadataBatchSizes",
Collections.emptyMap());
+ metricsGroup.metricName("MetadataBatchSizes", Map.of());
/**
* A histogram tracking the sizes of batches that we have processed.
diff --git
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
index 830bd29a243..ba94486e83c 100644
---
a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
+++
b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java
@@ -24,7 +24,6 @@ import
org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -99,9 +98,9 @@ public class ClientMetricsConfigs extends AbstractConfig {
));
private static final ConfigDef CONFIG = new ConfigDef()
- .define(SUBSCRIPTION_METRICS, Type.LIST, Collections.emptyList(),
Importance.MEDIUM, "Subscription metrics list")
+ .define(SUBSCRIPTION_METRICS, Type.LIST, List.of(), Importance.MEDIUM,
"Subscription metrics list")
.define(PUSH_INTERVAL_MS, Type.INT, DEFAULT_INTERVAL_MS,
Importance.MEDIUM, "Push interval in milliseconds")
- .define(CLIENT_MATCH_PATTERN, Type.LIST, Collections.emptyList(),
Importance.MEDIUM, "Client match pattern list");
+ .define(CLIENT_MATCH_PATTERN, Type.LIST, List.of(), Importance.MEDIUM,
"Client match pattern list");
public ClientMetricsConfigs(Properties props) {
super(CONFIG, props);
@@ -165,7 +164,7 @@ public class ClientMetricsConfigs extends AbstractConfig {
*/
public static Map<String, Pattern> parseMatchingPatterns(List<String>
patterns) {
if (patterns == null || patterns.isEmpty()) {
- return Collections.emptyMap();
+ return Map.of();
}
Map<String, Pattern> patternsMap = new HashMap<>();
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
index ab767cdb237..1be37ae87b7 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/FinalContext.java
@@ -27,8 +27,8 @@ import
org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
/**
* The share fetch context for a final share fetch request.
@@ -55,7 +55,7 @@ public class FinalContext extends ShareFetchContext {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
log.debug("Final context returning {}",
partitionsToLogString(updates.keySet()));
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, 0,
- updates.entrySet().iterator(), Collections.emptyList()));
+ updates.entrySet().iterator(), List.of()));
}
@Override
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
index 2c472660d49..36062616cec 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/ShareFetchContext.java
@@ -27,6 +27,7 @@ import org.apache.kafka.server.share.session.ShareSession;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
+import java.util.List;
/**
* The context for every share fetch request. The context is responsible for
tracking the topic partitions present in
@@ -50,7 +51,7 @@ public abstract class ShareFetchContext {
*/
public ShareFetchResponse throttleResponse(int throttleTimeMs) {
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), Collections.emptyList()));
+ Collections.emptyIterator(), List.of()));
}
/**
diff --git
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
index 1936ec1f0cc..18df489af33 100644
---
a/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
+++
b/server/src/main/java/org/apache/kafka/server/share/context/ShareSessionContext.java
@@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
@@ -103,7 +104,7 @@ public class ShareSessionContext extends ShareFetchContext {
public ShareFetchResponse throttleResponse(int throttleTimeMs) {
if (!isSubsequent) {
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), Collections.emptyList()));
+ Collections.emptyIterator(), List.of()));
}
int expectedEpoch =
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
@@ -114,10 +115,10 @@ public class ShareSessionContext extends
ShareFetchContext {
log.debug("Subsequent share session {} expected epoch {}, but got
{}. " +
"Possible duplicate request.", session.key(),
expectedEpoch, sessionEpoch);
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
- throttleTimeMs, Collections.emptyIterator(),
Collections.emptyList()));
+ throttleTimeMs, Collections.emptyIterator(), List.of()));
}
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.NONE, throttleTimeMs,
- Collections.emptyIterator(), Collections.emptyList()));
+ Collections.emptyIterator(), List.of()));
}
/**
@@ -196,7 +197,7 @@ public class ShareSessionContext extends ShareFetchContext {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> updates) {
if (!isSubsequent) {
return new ShareFetchResponse(ShareFetchResponse.toMessage(
- Errors.NONE, 0, updates.entrySet().iterator(),
Collections.emptyList()));
+ Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
} else {
int expectedEpoch =
ShareRequestMetadata.nextEpoch(reqMetadata.epoch());
int sessionEpoch;
@@ -207,7 +208,7 @@ public class ShareSessionContext extends ShareFetchContext {
log.debug("Subsequent share session {} expected epoch {}, but
got {}. Possible duplicate request.",
session.key(), expectedEpoch, sessionEpoch);
return new
ShareFetchResponse(ShareFetchResponse.toMessage(Errors.INVALID_SHARE_SESSION_EPOCH,
- 0, Collections.emptyIterator(),
Collections.emptyList()));
+ 0, Collections.emptyIterator(), List.of()));
}
// Iterate over the update list using PartitionIterator. This will
prune updates which don't need to be sent
Iterator<Map.Entry<TopicIdPartition,
ShareFetchResponseData.PartitionData>> partitionIterator = new
PartitionIterator(
@@ -218,7 +219,7 @@ public class ShareSessionContext extends ShareFetchContext {
log.debug("Subsequent share session context with session key {}
returning {}", session.key(),
partitionsToLogString(updates.keySet()));
return new ShareFetchResponse(ShareFetchResponse.toMessage(
- Errors.NONE, 0, updates.entrySet().iterator(),
Collections.emptyList()));
+ Errors.NONE, 0, updates.entrySet().iterator(), List.of()));
}
}
diff --git
a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
index 3bcb5136699..701cfae1fa6 100644
---
a/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
+++
b/server/src/test/java/org/apache/kafka/network/RequestConvertToJsonTest.java
@@ -36,7 +36,6 @@ import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -70,7 +69,7 @@ public class RequestConvertToJsonTest {
}
}
}
- assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled
request keys");
+ assertEquals(List.of(), unhandledKeys, "Unhandled request keys");
}
@Test
@@ -116,7 +115,7 @@ public class RequestConvertToJsonTest {
}
}
}
- assertEquals(Collections.emptyList(), unhandledKeys, "Unhandled
response keys");
+ assertEquals(List.of(), unhandledKeys, "Unhandled response keys");
}
@Test
diff --git
a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
index f426c355ceb..34af19cfa37 100644
--- a/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/AssignmentsManagerTest.java
@@ -58,7 +58,6 @@ import org.slf4j.LoggerFactory;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -490,25 +489,25 @@ public class AssignmentsManagerTest {
setTopics(Arrays.asList(
new AssignReplicasToDirsRequestData.TopicData().
setTopicId(TOPIC_1).
- setPartitions(Collections.singletonList(
+ setPartitions(List.of(
new
AssignReplicasToDirsRequestData.PartitionData().
setPartitionIndex(2))),
new AssignReplicasToDirsRequestData.TopicData().
setTopicId(TOPIC_2).
- setPartitions(Collections.singletonList(
+ setPartitions(List.of(
new
AssignReplicasToDirsRequestData.PartitionData().
setPartitionIndex(5))))),
new AssignReplicasToDirsRequestData.DirectoryData().
setId(DIR_3).
- setTopics(Collections.singletonList(
+ setTopics(List.of(
new AssignReplicasToDirsRequestData.TopicData().
setTopicId(TOPIC_1).
- setPartitions(Collections.singletonList(
+ setPartitions(List.of(
new
AssignReplicasToDirsRequestData.PartitionData().
setPartitionIndex(3))))),
new AssignReplicasToDirsRequestData.DirectoryData().
setId(DIR_1).
- setTopics(Collections.singletonList(
+ setTopics(List.of(
new AssignReplicasToDirsRequestData.TopicData().
setTopicId(TOPIC_1).
setPartitions(Arrays.asList(
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
index 6b5c84c8aea..20fa408d358 100644
---
a/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
+++
b/server/src/test/java/org/apache/kafka/server/metrics/BrokerServerMetricsTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.image.MetadataProvenance;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
@@ -43,12 +42,12 @@ public final class BrokerServerMetricsTest {
// Metric description is not use for metric name equality
Set<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
- new MetricName("last-applied-record-offset", expectedGroup,
"", Collections.emptyMap()),
- new MetricName("last-applied-record-timestamp", expectedGroup,
"", Collections.emptyMap()),
- new MetricName("last-applied-record-lag-ms", expectedGroup,
"", Collections.emptyMap()),
- new MetricName("metadata-load-error-count", expectedGroup, "",
Collections.emptyMap()),
- new MetricName("metadata-apply-error-count", expectedGroup,
"", Collections.emptyMap()),
- new MetricName("ignored-static-voters", expectedGroup, "",
Collections.emptyMap())
+ new MetricName("last-applied-record-offset", expectedGroup,
"", Map.of()),
+ new MetricName("last-applied-record-timestamp", expectedGroup,
"", Map.of()),
+ new MetricName("last-applied-record-lag-ms", expectedGroup,
"", Map.of()),
+ new MetricName("metadata-load-error-count", expectedGroup, "",
Map.of()),
+ new MetricName("metadata-apply-error-count", expectedGroup,
"", Map.of()),
+ new MetricName("ignored-static-voters", expectedGroup, "",
Map.of())
));
try (BrokerServerMetrics ignored = new BrokerServerMetrics(metrics)) {
diff --git
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java
index 8bf22e7f90f..4bf26ba55ca 100644
---
a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java
+++
b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadataTest.java
@@ -22,7 +22,6 @@ import org.junit.jupiter.api.Test;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
@@ -37,25 +36,25 @@ public class ClientMetricsInstanceMetadataTest {
Uuid uuid = Uuid.randomUuid();
ClientMetricsInstanceMetadata instanceMetadata = new
ClientMetricsInstanceMetadata(uuid, ClientMetricsTestUtils.requestContext());
// We consider empty/missing client matching patterns as valid
- assertTrue(instanceMetadata.isMatch(Collections.emptyMap()));
+ assertTrue(instanceMetadata.isMatch(Map.of()));
assertTrue(instanceMetadata.isMatch(
- Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID,
Pattern.compile(".*"))));
+ Map.of(ClientMetricsConfigs.CLIENT_ID, Pattern.compile(".*"))));
assertTrue(instanceMetadata.isMatch(
- Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID,
Pattern.compile("producer-1"))));
+ Map.of(ClientMetricsConfigs.CLIENT_ID,
Pattern.compile("producer-1"))));
assertTrue(instanceMetadata.isMatch(
- Collections.singletonMap(ClientMetricsConfigs.CLIENT_ID,
Pattern.compile("producer.*"))));
+ Map.of(ClientMetricsConfigs.CLIENT_ID,
Pattern.compile("producer.*"))));
assertTrue(instanceMetadata.isMatch(
- Collections.singletonMap(ClientMetricsConfigs.CLIENT_INSTANCE_ID,
Pattern.compile(uuid.toString()))));
+ Map.of(ClientMetricsConfigs.CLIENT_INSTANCE_ID,
Pattern.compile(uuid.toString()))));
assertTrue(instanceMetadata.isMatch(
-
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME,
Pattern.compile("apache-kafka-java"))));
+ Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME,
Pattern.compile("apache-kafka-java"))));
assertTrue(instanceMetadata.isMatch(
-
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION,
Pattern.compile("3.5.2"))));
+ Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION,
Pattern.compile("3.5.2"))));
assertTrue(instanceMetadata.isMatch(
-
Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS,
Pattern.compile(
+ Map.of(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, Pattern.compile(
InetAddress.getLocalHost().getHostAddress()))));
assertTrue(instanceMetadata.isMatch(
- Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOURCE_PORT,
Pattern.compile(
+ Map.of(ClientMetricsConfigs.CLIENT_SOURCE_PORT, Pattern.compile(
String.valueOf(ClientMetricsTestUtils.CLIENT_PORT)))));
}
@@ -125,9 +124,9 @@ public class ClientMetricsInstanceMetadataTest {
ClientMetricsTestUtils.requestContext());
// Unknown key in pattern map
-
assertFalse(instanceMetadata.isMatch(Collections.singletonMap("unknown",
Pattern.compile(".*"))));
+ assertFalse(instanceMetadata.isMatch(Map.of("unknown",
Pattern.compile(".*"))));
// '*' key is considered as invalid regex pattern
- assertFalse(instanceMetadata.isMatch(Collections.singletonMap("*",
Pattern.compile(".*"))));
+ assertFalse(instanceMetadata.isMatch(Map.of("*",
Pattern.compile(".*"))));
}
@Test
@@ -136,9 +135,9 @@ public class ClientMetricsInstanceMetadataTest {
ClientMetricsInstanceMetadata instanceMetadata = new
ClientMetricsInstanceMetadata(uuid,
ClientMetricsTestUtils.requestContextWithNullClientInfo());
-
assertFalse(instanceMetadata.isMatch(Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME,
+
assertFalse(instanceMetadata.isMatch(Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME,
Pattern.compile(".*"))));
-
assertFalse(instanceMetadata.isMatch(Collections.singletonMap(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION,
+
assertFalse(instanceMetadata.isMatch(Map.of(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION,
Pattern.compile(".*"))));
}
}
diff --git
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
index 7b8d42cd859..d5d42c5a3ec 100644
---
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionCacheTest.java
@@ -24,8 +24,8 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -61,7 +61,7 @@ public class ShareSessionCacheTest {
assertEquals(0, cache.totalPartitions());
ShareSessionKey key1 = cache.maybeCreateSession("grp",
Uuid.randomUuid(), 0, mockedSharePartitionMap(2));
assertNotNull(key1);
- assertShareCacheContains(cache, new
ArrayList<>(Collections.singletonList(key1)));
+ assertShareCacheContains(cache, new ArrayList<>(List.of(key1)));
ShareSession session1 = cache.get(key1);
assertEquals(2, session1.size());
assertEquals(2, cache.totalPartitions());
@@ -82,7 +82,7 @@ public class ShareSessionCacheTest {
assertEquals(6, cache.totalPartitions());
assertEquals(2, cache.size());
cache.remove(key1);
- assertShareCacheContains(cache, new
ArrayList<>(Collections.singletonList(key2)));
+ assertShareCacheContains(cache, new ArrayList<>(List.of(key2)));
assertEquals(1, cache.size());
assertEquals(4, cache.totalPartitions());
diff --git
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
index a9022bb2fea..6a30ac97ce4 100644
---
a/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
+++
b/server/src/test/java/org/apache/kafka/server/share/session/ShareSessionTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.Uuid;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,10 +45,10 @@ public class ShareSessionTest {
@Test
public void testPartitionsToLogStringEmpty() {
- String response =
ShareSession.partitionsToLogString(Collections.emptyList(), false);
+ String response = ShareSession.partitionsToLogString(List.of(), false);
assertEquals("0 partition(s)", response);
- response = ShareSession.partitionsToLogString(Collections.emptyList(),
true);
+ response = ShareSession.partitionsToLogString(List.of(), true);
assertEquals("( [] )", response);
}
}