This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a206feb4ba2 MINOR: Clean up share-coordinator (#19007)
a206feb4ba2 is described below
commit a206feb4ba2e051395a5ef51b80767ba26307a29
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Sun Feb 23 08:57:38 2025 +0530
MINOR: Clean up share-coordinator (#19007)
Given that now we support Java 17 on our brokers, this PR replace the use
of `Collections.singletonList()` and `Collections.emptyList()` with `List.of()`
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../coordinator/share/ShareCoordinatorService.java | 9 +-
.../coordinator/share/ShareCoordinatorShard.java | 9 +-
.../share/metrics/ShareCoordinatorMetrics.java | 2 +-
.../share/PersisterStateBatchCombinerTest.java | 13 +-
.../share/ShareCoordinatorRecordHelpersTest.java | 10 +-
.../share/ShareCoordinatorRecordSerdeTest.java | 4 +-
.../share/ShareCoordinatorShardTest.java | 162 ++++++++++-----------
.../share/ShareCoordinatorTestConfig.java | 2 +-
8 files changed, 104 insertions(+), 107 deletions(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 764e008136e..8b87aed65ef 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -62,7 +62,6 @@ import org.slf4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -413,9 +412,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
coordinator -> coordinator.writeState(new
WriteShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(topicData.topicId())
-
.setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(partitionData.partition())
.setStartOffset(partitionData.startOffset())
.setLeaderEpoch(partitionData.leaderEpoch())
@@ -531,9 +530,9 @@ public class ShareCoordinatorService implements
ShareCoordinator {
ReadShareGroupStateRequestData requestForCurrentPartition =
new ReadShareGroupStateRequestData()
.setGroupId(groupId)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(topicId)
-
.setPartitions(Collections.singletonList(partitionData))));
+ .setPartitions(List.of(partitionData))));
// We are issuing a scheduleWriteOperation even though the
request is of read type since
// we might want to update the leader epoch, if it is the
highest seen so far for the specific
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index 1022a36fb65..4bbdb2c03cc 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -66,7 +66,6 @@ import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -88,7 +87,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
public static final Exception NEGATIVE_PARTITION_ID = new Exception("The
partition id cannot be a negative number.");
public static class Builder implements
CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> {
- private ShareCoordinatorConfig config;
+ private final ShareCoordinatorConfig config;
private LogContext logContext;
private SnapshotRegistry snapshotRegistry;
private CoordinatorMetrics coordinatorMetrics;
@@ -326,7 +325,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
))
);
- return new CoordinatorResult<>(Collections.singletonList(record),
responseData);
+ return new CoordinatorResult<>(List.of(record), responseData);
}
/**
@@ -409,7 +408,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setStateEpoch(responseData.results().get(0).partitions().get(0).stateEpoch());
CoordinatorRecord record =
generateShareStateRecord(writePartitionData, key);
- return new CoordinatorResult<>(Collections.singletonList(record),
responseData);
+ return new CoordinatorResult<>(List.of(record), responseData);
}
/**
@@ -519,7 +518,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
))
);
- return new CoordinatorResult<>(Collections.singletonList(record),
responseData);
+ return new CoordinatorResult<>(List.of(record), responseData);
}
/**
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
index 197b1d76e0b..9081f384a48 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
@@ -48,7 +48,7 @@ public class ShareCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME =
"ShareCoordinatorWrite";
public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME =
"ShareCoordinatorWriteLatency";
public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME
= "ShareCoordinatorStateTopicPruneSensorName";
- private Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics = new
ConcurrentHashMap<>();
+ private final Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics =
new ConcurrentHashMap<>();
/**
* Global sensors. These are shared across all metrics shards.
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java
index 6541e9479fd..26ddbea2c14 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/PersisterStateBatchCombinerTest.java
@@ -22,7 +22,6 @@ import
org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Stream;
@@ -70,7 +69,7 @@ public class PersisterStateBatchCombinerTest {
int deliveryState,
int deliveryCount
) {
- return Collections.singletonList(
+ return List.of(
new PersisterStateBatch(firstOffset, lastOffset, (byte)
deliveryState, (short) deliveryCount)
);
}
@@ -108,14 +107,14 @@ public class PersisterStateBatchCombinerTest {
new BatchTestHolder(
"Current batches with start offset midway are pruned.",
BatchTestHolder.singleBatch(100, 130, 0, 1),
- Collections.emptyList(),
+ List.of(),
BatchTestHolder.singleBatch(120, 130, 0, 1),
120
),
new BatchTestHolder(
"New batches with start offset midway are pruned.",
- Collections.emptyList(),
+ List.of(),
BatchTestHolder.singleBatch(100, 130, 0, 1),
BatchTestHolder.singleBatch(120, 130, 0, 1),
120
@@ -123,9 +122,9 @@ public class PersisterStateBatchCombinerTest {
new BatchTestHolder(
"Both current and new batches empty.",
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
+ List.of(),
120
)
);
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
index 9de59e499d1..77e539c1dc3 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
@@ -27,7 +27,7 @@ import
org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -47,7 +47,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setStateEpoch(1)
.setLeaderEpoch(5)
.setStartOffset(0)
- .setStateBatches(Collections.singletonList(batch))
+ .setStateBatches(List.of(batch))
.build()
);
@@ -62,7 +62,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setStateEpoch(1)
.setLeaderEpoch(5)
.setStartOffset(0)
- .setStateBatches(Collections.singletonList(
+ .setStateBatches(List.of(
new ShareSnapshotValue.StateBatch()
.setFirstOffset(1L)
.setLastOffset(10L)
@@ -88,7 +88,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setStateEpoch(-1) // ignored for share update
.setLeaderEpoch(5)
.setStartOffset(0)
- .setStateBatches(Collections.singletonList(batch))
+ .setStateBatches(List.of(batch))
.build()
);
@@ -102,7 +102,7 @@ public class ShareCoordinatorRecordHelpersTest {
.setSnapshotEpoch(0)
.setLeaderEpoch(5)
.setStartOffset(0)
- .setStateBatches(Collections.singletonList(
+ .setStateBatches(List.of(
new ShareUpdateValue.StateBatch()
.setFirstOffset(1L)
.setLastOffset(10L)
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
index 8f0bb7e82d1..9706902f0b1 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordSerdeTest.java
@@ -30,7 +30,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
-import java.util.Collections;
+import java.util.List;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -221,7 +221,7 @@ public class ShareCoordinatorRecordSerdeTest {
.setLeaderEpoch(2)
.setStateEpoch(1)
.setSnapshotEpoch(1)
- .setStateBatches(Collections.singletonList(new
ShareSnapshotValue.StateBatch()
+ .setStateBatches(List.of(new
ShareSnapshotValue.StateBatch()
.setFirstOffset(1)
.setLastOffset(10)
.setDeliveryState((byte) 0)
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index ab136f8dc31..64b7ba21e9f 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -132,14 +132,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(leaderEpoch)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -172,7 +172,7 @@ class ShareCoordinatorShardTest {
.setSnapshotEpoch(0)
.setStateEpoch(0)
.setLeaderEpoch(leaderEpoch)
- .setStateBatches(Collections.singletonList(
+ .setStateBatches(List.of(
new ShareSnapshotValue.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
@@ -192,7 +192,7 @@ class ShareCoordinatorShardTest {
.setSnapshotEpoch(1)
.setStateEpoch(1)
.setLeaderEpoch(leaderEpoch + 1)
- .setStateBatches(Collections.singletonList(
+ .setStateBatches(List.of(
new ShareSnapshotValue.StateBatch()
.setFirstOffset(11)
.setLastOffset(12)
@@ -225,14 +225,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -243,7 +243,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData =
WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
- List<CoordinatorRecord> expectedRecords =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ List<CoordinatorRecord> expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0))
));
@@ -265,14 +265,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request1 = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -280,14 +280,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request2 = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11)
.setLastOffset(20)
.setDeliveryCount((short) 1)
@@ -298,7 +298,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData =
WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
- List<CoordinatorRecord> expectedRecords =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ List<CoordinatorRecord> expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0))
));
@@ -316,7 +316,7 @@ class ShareCoordinatorShardTest {
expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID,
PARTITION);
// The snapshot epoch here will be 1 since this is a snapshot update
record,
// and it refers to parent share snapshot.
- expectedRecords =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+ expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0), 0)
));
@@ -329,7 +329,7 @@ class ShareCoordinatorShardTest {
assertEquals(incrementalUpdate.leaderEpoch(),
combinedState.leaderEpoch());
assertEquals(incrementalUpdate.startOffset(),
combinedState.startOffset());
// The batches should have combined to 1 since same state.
- assertEquals(Collections.singletonList(new PersisterStateBatch(0, 20,
(byte) 0, (short) 1)),
+ assertEquals(List.of(new PersisterStateBatch(0, 20, (byte) 0, (short)
1)),
combinedState.stateBatches());
assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
}
@@ -344,14 +344,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(partition)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -361,7 +361,7 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateResponseData expectedData =
WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, partition, Errors.INVALID_REQUEST,
ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
- List<CoordinatorRecord> expectedRecords = Collections.emptyList();
+ List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
@@ -379,14 +379,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(0)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(0)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -396,7 +396,7 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateResponseData expectedData =
WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION,
Errors.UNKNOWN_TOPIC_OR_PARTITION.message());
- List<CoordinatorRecord> expectedRecords = Collections.emptyList();
+ List<CoordinatorRecord> expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
@@ -413,14 +413,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request1 = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(5)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -428,14 +428,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request2 = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0)
.setLeaderEpoch(3) // Lower leader epoch in the second
request.
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11)
.setLastOffset(20)
.setDeliveryCount((short) 1)
@@ -446,7 +446,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData =
WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
- List<CoordinatorRecord> expectedRecords =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ List<CoordinatorRecord> expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0))
));
@@ -462,7 +462,7 @@ class ShareCoordinatorShardTest {
// Since the leader epoch in the second request was lower than the one
in the first request, FENCED_LEADER_EPOCH error is expected.
expectedData = WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, PARTITION, Errors.FENCED_LEADER_EPOCH,
Errors.FENCED_LEADER_EPOCH.message());
- expectedRecords = Collections.emptyList();
+ expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
@@ -479,14 +479,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request1 = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(1)
.setLeaderEpoch(5)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -494,14 +494,14 @@ class ShareCoordinatorShardTest {
WriteShareGroupStateRequestData request2 = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(0)
.setStateEpoch(0) // Lower state epoch in the second
request.
.setLeaderEpoch(5)
- .setStateBatches(Collections.singletonList(new
WriteShareGroupStateRequestData.StateBatch()
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(11)
.setLastOffset(20)
.setDeliveryCount((short) 1)
@@ -512,7 +512,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData =
WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
- List<CoordinatorRecord> expectedRecords =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ List<CoordinatorRecord> expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request1.topics().get(0).partitions().get(0))
));
@@ -528,7 +528,7 @@ class ShareCoordinatorShardTest {
// Since the leader epoch in the second request was lower than the one
in the first request, FENCED_LEADER_EPOCH error is expected.
expectedData = WriteShareGroupStateResponse.toErrorResponseData(
TOPIC_ID, PARTITION, Errors.FENCED_STATE_EPOCH,
Errors.FENCED_STATE_EPOCH.message());
- expectedRecords = Collections.emptyList();
+ expectedRecords = List.of();
assertEquals(expectedData, result.response());
assertEquals(expectedRecords, result.records());
@@ -547,9 +547,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new
ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(1)))));
@@ -560,7 +560,7 @@ class ShareCoordinatorShardTest {
PARTITION,
0,
0,
- Collections.singletonList(new
ReadShareGroupStateResponseData.StateBatch()
+ List.of(new ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(0)
.setLastOffset(10)
.setDeliveryCount((short) 1)
@@ -581,9 +581,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateSummaryRequestData request = new
ReadShareGroupStateSummaryRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(1)))));
@@ -611,9 +611,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new
ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(5)))));
@@ -640,9 +640,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateSummaryRequestData request = new
ReadShareGroupStateSummaryRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(partition)
.setLeaderEpoch(5)))));
@@ -669,9 +669,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new
ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(0)
.setLeaderEpoch(5)))));
@@ -698,9 +698,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new
ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(3))))); // Lower leaderEpoch than the one
stored in leaderMap.
@@ -753,9 +753,9 @@ class ShareCoordinatorShardTest {
// Set initial state.
WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(100)
.setStateEpoch(0)
@@ -784,7 +784,7 @@ class ShareCoordinatorShardTest {
shard.replay(0L, 0L, (short) 0, result.records().get(0));
WriteShareGroupStateResponseData expectedData =
WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
- List<CoordinatorRecord> expectedRecords =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ List<CoordinatorRecord> expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0))
));
@@ -800,14 +800,14 @@ class ShareCoordinatorShardTest {
// Acknowledge b1.
WriteShareGroupStateRequestData requestUpdateB1 = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(-1)
.setStateEpoch(0)
.setLeaderEpoch(0)
- .setStateBatches(Collections.singletonList(
+ .setStateBatches(List.of(
new WriteShareGroupStateRequestData.StateBatch()
//b1
.setFirstOffset(100)
.setLastOffset(109)
@@ -822,14 +822,14 @@ class ShareCoordinatorShardTest {
// Ack batch 3 and move start offset.
WriteShareGroupStateRequestData requestUpdateStartOffsetAndB3 = new
WriteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setStartOffset(110) // 100 -> 110
.setStateEpoch(0)
.setLeaderEpoch(0)
- .setStateBatches(Collections.singletonList(
+ .setStateBatches(List.of(
new WriteShareGroupStateRequestData.StateBatch()
//b3
.setFirstOffset(120)
.setLastOffset(129)
@@ -852,7 +852,7 @@ class ShareCoordinatorShardTest {
new PersisterStateBatch(120, 129, (byte) 2, (short) 1)
))
.build();
- List<CoordinatorRecord> expectedRecordsFinal =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ List<CoordinatorRecord> expectedRecordsFinal =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, offsetFinal
));
@@ -874,7 +874,7 @@ class ShareCoordinatorShardTest {
.build();
when(manager.lastRedundantOffset()).thenReturn(Optional.of(10L));
- assertEquals(new CoordinatorResult<>(Collections.emptyList(),
Optional.of(10L)), shard.lastRedundantOffset());
+ assertEquals(new CoordinatorResult<>(List.of(), Optional.of(10L)),
shard.lastRedundantOffset());
}
@Test
@@ -885,9 +885,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request = new
ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(2)
))));
@@ -900,12 +900,12 @@ class ShareCoordinatorShardTest {
TOPIC_ID, PARTITION,
PartitionFactory.UNINITIALIZED_START_OFFSET,
PartitionFactory.DEFAULT_STATE_EPOCH,
- Collections.emptyList());
- List<CoordinatorRecord> expectedRecords =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ List.of());
+ List<CoordinatorRecord> expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
GROUP_ID, TOPIC_ID, PARTITION, new ShareGroupOffset.Builder()
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
.setLeaderEpoch(2)
- .setStateBatches(Collections.emptyList())
+ .setStateBatches(List.of())
.setSnapshotEpoch(0)
.setStateEpoch(PartitionFactory.DEFAULT_STATE_EPOCH)
.build()
@@ -925,9 +925,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request1 = new
ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(2)
))));
@@ -940,9 +940,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request2 = new
ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(-1)
))));
@@ -953,9 +953,9 @@ class ShareCoordinatorShardTest {
ReadShareGroupStateRequestData request3 = new
ReadShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopics(List.of(new
ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
ReadShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
ReadShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)
.setLeaderEpoch(-1)
))));
@@ -974,9 +974,9 @@ class ShareCoordinatorShardTest {
DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)))));
CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
@@ -1031,9 +1031,9 @@ class ShareCoordinatorShardTest {
DeleteShareGroupStateRequestData request = new
DeleteShareGroupStateRequestData()
.setGroupId(GROUP_ID)
- .setTopics(Collections.singletonList(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
- .setPartitions(Collections.singletonList(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
.setPartition(PARTITION)))));
CoordinatorResult<DeleteShareGroupStateResponseData,
CoordinatorRecord> result = shard.deleteState(request);
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
index 75916187b28..eab6f2966ac 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorTestConfig.java
@@ -29,7 +29,7 @@ import java.util.Map;
public class ShareCoordinatorTestConfig {
- private static final List<ConfigDef> CONFIG_DEF_LIST =
Collections.singletonList(
+ private static final List<ConfigDef> CONFIG_DEF_LIST = List.of(
ShareCoordinatorConfig.CONFIG_DEF
);