This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9f293866ab2 MINOR: Cleanup Share Coordinator (#19770) 9f293866ab2 is described below commit 9f293866ab2a33b7fa3114ffa4638f03fdb59bc9 Author: Sanskar Jhajharia <sjhajha...@confluent.io> AuthorDate: Tue May 20 17:03:20 2025 +0530 MINOR: Cleanup Share Coordinator (#19770) Now that Kafka Brokers support Java 17, this PR updates the share coordinator module to get rid of older code. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and - Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../java/org/apache/kafka/coordinator/share/ShareGroupOffset.java | 2 +- .../kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java | 4 ++-- .../apache/kafka/coordinator/share/ShareCoordinatorShardTest.java | 8 +++----- .../coordinator/share/metrics/ShareCoordinatorMetricsTest.java | 4 ++-- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java index c0397a273f4..2ca1a646885 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java @@ -190,7 +190,7 @@ public class ShareGroupOffset { } public Builder setStateBatches(List<PersisterStateBatch> stateBatches) { - this.stateBatches = stateBatches == null ? Collections.emptyList() : stateBatches.stream().toList(); + this.stateBatches = stateBatches == null ? List.of() : stateBatches.stream().toList(); return this; } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java index 9081f384a48..c51feaffd1e 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java @@ -31,8 +31,8 @@ import org.apache.kafka.timeline.SnapshotRegistry; import com.yammer.metrics.core.MetricsRegistry; -import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -91,7 +91,7 @@ public class ShareCoordinatorMetrics extends CoordinatorMetrics implements AutoC @Override public void close() throws Exception { - Arrays.asList( + List.of( SHARE_COORDINATOR_WRITE_SENSOR_NAME, SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME ).forEach(metrics::removeSensor); diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java index f1528570790..5f91b018bee 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java @@ -59,8 +59,6 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -760,7 +758,7 @@ class ShareCoordinatorShardTest { // -Share coordinator writes the snapshot with startOffset 110 and batch 3. // -batch2 should NOT be lost ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder() - .setConfigOverrides(Collections.singletonMap(ShareCoordinatorConfig.SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, "0")) + .setConfigOverrides(Map.of(ShareCoordinatorConfig.SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, "0")) .build(); SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); @@ -775,7 +773,7 @@ class ShareCoordinatorShardTest { .setStartOffset(100) .setStateEpoch(0) .setLeaderEpoch(0) - .setStateBatches(Arrays.asList( + .setStateBatches(List.of( new WriteShareGroupStateRequestData.StateBatch() //b1 .setFirstOffset(100) .setLastOffset(109) @@ -862,7 +860,7 @@ class ShareCoordinatorShardTest { .setLeaderEpoch(0) .setStateEpoch(0) .setSnapshotEpoch(2) // since 2nd share snapshot - .setStateBatches(Arrays.asList( + .setStateBatches(List.of( new PersisterStateBatch(110, 119, (byte) 1, (short) 2), // b2 not lost new PersisterStateBatch(120, 129, (byte) 2, (short) 1) )) diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java index 39ca697f069..6b3c0a6490b 100644 --- a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java @@ -26,8 +26,8 @@ import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; -import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import static org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME; @@ -42,7 +42,7 @@ public class ShareCoordinatorMetricsTest { public void testMetricNames() { Metrics metrics = new Metrics(); - HashSet<MetricName> expectedMetrics = new HashSet<>(Arrays.asList( + HashSet<MetricName> expectedMetrics = new HashSet<>(List.of( metrics.metricName("write-rate", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-total", ShareCoordinatorMetrics.METRICS_GROUP), metrics.metricName("write-latency-avg", ShareCoordinatorMetrics.METRICS_GROUP),