This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7d54f7b036c MINOR: Cleanup DelayedOperationKey (#20947)
7d54f7b036c is described below
commit 7d54f7b036c15553558ef5638f559622d7b23d4a
Author: Jhen-Yung Hsu <[email protected]>
AuthorDate: Tue Nov 25 04:55:55 2025 +0800
MINOR: Cleanup DelayedOperationKey (#20947)
Refactor the subclasses of DelayedOperationKey using `record`.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../purgatory/TopicPartitionOperationKey.java | 25 +---------------
.../server/purgatory/DelayedOperationTest.java | 22 +-------------
.../share/fetch/DelayedShareFetchGroupKey.java | 34 +---------------------
.../share/fetch/DelayedShareFetchPartitionKey.java | 30 +------------------
.../purgatory/DelayedRemoteListOffsetsTest.java | 2 +-
5 files changed, 5 insertions(+), 108 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/purgatory/TopicPartitionOperationKey.java
b/server-common/src/main/java/org/apache/kafka/server/purgatory/TopicPartitionOperationKey.java
index ac7b236eda0..209333cf2ef 100644
---
a/server-common/src/main/java/org/apache/kafka/server/purgatory/TopicPartitionOperationKey.java
+++
b/server-common/src/main/java/org/apache/kafka/server/purgatory/TopicPartitionOperationKey.java
@@ -19,20 +19,10 @@ package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
-import java.util.Objects;
-
/**
* Used by delayed-produce and delayed-fetch operations
*/
-public class TopicPartitionOperationKey implements DelayedOperationKey {
-
- public final String topic;
- public final int partition;
-
- public TopicPartitionOperationKey(String topic, int partition) {
- this.topic = topic;
- this.partition = partition;
- }
+public record TopicPartitionOperationKey(String topic, int partition)
implements DelayedOperationKey {
public TopicPartitionOperationKey(TopicPartition tp) {
this(tp.topic(), tp.partition());
@@ -46,17 +36,4 @@ public class TopicPartitionOperationKey implements
DelayedOperationKey {
public String keyLabel() {
return topic + "-" + partition;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- TopicPartitionOperationKey that = (TopicPartitionOperationKey) o;
- return partition == that.partition && Objects.equals(topic,
that.topic);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(topic, partition);
- }
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java
b/server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java
index e7fb9a0e2c8..4f4ce9de48e 100644
---
a/server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/purgatory/DelayedOperationTest.java
@@ -24,7 +24,6 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
@@ -63,26 +62,7 @@ public class DelayedOperationTest {
executorService.shutdown();
}
- private static class MockKey implements DelayedOperationKey {
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- MockKey mockKey = (MockKey) o;
- return Objects.equals(key, mockKey.key);
- }
-
- @Override
- public int hashCode() {
- return key != null ? key.hashCode() : 0;
- }
-
- final String key;
-
- MockKey(String key) {
- this.key = key;
- }
-
+ private record MockKey(String key) implements DelayedOperationKey {
@Override
public String keyLabel() {
return key;
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java
index 88dbabf6e5c..68a3b307442 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchGroupKey.java
@@ -19,47 +19,15 @@ package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
-import java.util.Objects;
-
/**
* A key for delayed share fetch purgatory that refers to the share partition.
*/
-public class DelayedShareFetchGroupKey implements DelayedShareFetchKey {
- private final String groupId;
- private final Uuid topicId;
- private final int partition;
+public record DelayedShareFetchGroupKey(String groupId, Uuid topicId, int
partition) implements DelayedShareFetchKey {
public DelayedShareFetchGroupKey(String groupId, TopicIdPartition
topicIdPartition) {
this(groupId, topicIdPartition.topicId(),
topicIdPartition.partition());
}
- public DelayedShareFetchGroupKey(String groupId, Uuid topicId, int
partition) {
- this.groupId = groupId;
- this.topicId = topicId;
- this.partition = partition;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- DelayedShareFetchGroupKey that = (DelayedShareFetchGroupKey) o;
- return topicId.equals(that.topicId) && partition == that.partition &&
groupId.equals(that.groupId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(topicId, partition, groupId);
- }
-
- @Override
- public String toString() {
- return "DelayedShareFetchGroupKey(groupId=" + groupId +
- ", topicId=" + topicId +
- ", partition=" + partition +
- ")";
- }
-
@Override
public String keyLabel() {
return String.format("groupId=%s, topicId=%s, partition=%s", groupId,
topicId, partition);
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
index c1e40975c2b..01fe966117e 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
@@ -19,43 +19,15 @@ package org.apache.kafka.server.share.fetch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
-import java.util.Objects;
-
/**
* A key for delayed share fetch purgatory that refers to the topic partition.
*/
-public class DelayedShareFetchPartitionKey implements DelayedShareFetchKey {
- private final Uuid topicId;
- private final int partition;
+public record DelayedShareFetchPartitionKey(Uuid topicId, int partition)
implements DelayedShareFetchKey {
public DelayedShareFetchPartitionKey(TopicIdPartition topicIdPartition) {
this(topicIdPartition.topicId(), topicIdPartition.partition());
}
- public DelayedShareFetchPartitionKey(Uuid topicId, int partition) {
- this.topicId = topicId;
- this.partition = partition;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- DelayedShareFetchPartitionKey that = (DelayedShareFetchPartitionKey) o;
- return topicId.equals(that.topicId) && partition == that.partition;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(topicId, partition);
- }
-
- @Override
- public String toString() {
- return "DelayedShareFetchPartitionKey(topicId=" + topicId +
- ", partition=" + partition + ")";
- }
-
@Override
public String keyLabel() {
return String.format("topicId=%s, partition=%s", topicId, partition);
diff --git
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
index 81b9073377a..6586bf35ae0 100644
---
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
@@ -100,7 +100,7 @@ public class DelayedRemoteListOffsetsTest {
assertEquals(numResponse.get(), listOffsetsRequestKeys.size());
assertEquals(listOffsetsRequestKeys.size(),
DelayedRemoteListOffsets.AGGREGATE_EXPIRATION_METER.count());
listOffsetsRequestKeys.forEach(key -> {
- TopicPartition tp = new TopicPartition(key.topic, key.partition);
+ TopicPartition tp = new TopicPartition(key.topic(),
key.partition());
assertEquals(1,
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.get(tp).count());
});
}