This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e862c09031 KAFKA-15875: Stops leak Snapshot in public methods (#16807)
4e862c09031 is described below

commit 4e862c0903195eb11a715271c29304d28036273d
Author: Josep Prat <[email protected]>
AuthorDate: Thu Aug 8 20:05:47 2024 +0200

    KAFKA-15875: Stops leak Snapshot in public methods (#16807)
    
    * KAFKA-15875: Stops leak Snapshot in public methods
    
    The Snapshot class is package protected but it's returned in
    several public methods in SnapshotRegistry.
    To prevent this accidental leakage, these methods are made
    package protected as well. For getOrCreateSnapshot a new
    method called IdempotentCreateSnapshot is created that returns void.
    * Make builer package protected, replace <br> with <p>
    
    Reviewers: Greg Harris <[email protected]>
---
 .../group/runtime/SnapshottableCoordinator.java    |  4 +-
 .../group/GroupMetadataManagerTestContext.java     |  4 +-
 .../group/OffsetMetadataManagerTest.java           |  4 +-
 .../metrics/GroupCoordinatorMetricsShardTest.java  | 12 ++---
 .../group/metrics/GroupCoordinatorMetricsTest.java |  4 +-
 .../group/modern/consumer/ConsumerGroupTest.java   | 14 +++---
 .../group/modern/share/ShareGroupTest.java         | 12 ++---
 .../jmh/timeline/TimelineHashMapBenchmark.java     |  2 +-
 .../kafka/controller/OffsetControlManager.java     | 18 +++----
 .../kafka/controller/AclControlManagerTest.java    |  2 +-
 .../controller/FeatureControlManagerTest.java      | 11 ++---
 .../apache/kafka/timeline/SnapshotRegistry.java    | 34 ++++++++-----
 .../kafka/timeline/SnapshottableHashTable.java     | 55 +++++-----------------
 13 files changed, 78 insertions(+), 98 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
index b7f74748c45..467ed84da01 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
@@ -74,7 +74,7 @@ class SnapshottableCoordinator<S extends CoordinatorShard<U>, 
U> implements Coor
         this.tp = tp;
         this.lastWrittenOffset = 0;
         this.lastCommittedOffset = 0;
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
     }
 
     /**
@@ -145,7 +145,7 @@ class SnapshottableCoordinator<S extends 
CoordinatorShard<U>, U> implements Coor
         }
 
         lastWrittenOffset = offset;
-        snapshotRegistry.getOrCreateSnapshot(offset);
+        snapshotRegistry.idempotentCreateSnapshot(offset);
         log.debug("Updated last written offset of {} to {}.", tp, offset);
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index cef907da2c1..d9add89eb50 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -532,7 +532,7 @@ public class GroupMetadataManagerTestContext {
         this.groupMetadataManager = groupMetadataManager;
         this.classicGroupInitialRebalanceDelayMs = 
classicGroupInitialRebalanceDelayMs;
         this.classicGroupNewMemberJoinTimeoutMs = 
classicGroupNewMemberJoinTimeoutMs;
-        snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+        snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
     }
 
     public void commit() {
@@ -1512,7 +1512,7 @@ public class GroupMetadataManagerTestContext {
         }
 
         lastWrittenOffset++;
-        snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+        snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
     }
 
     void onUnloaded() {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index f4334dc6e54..a8d019258f4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -481,7 +481,7 @@ public class OffsetMetadataManagerTest {
             long producerId,
             CoordinatorRecord record
         ) {
-            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+            snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
 
             ApiMessageAndVersion key = record.key();
             ApiMessageAndVersion value = record.value();
@@ -512,7 +512,7 @@ public class OffsetMetadataManagerTest {
             long producerId,
             TransactionResult result
         ) {
-            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+            snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
             offsetMetadataManager.replayEndTransactionMarker(producerId, 
result);
             lastWrittenOffset++;
         }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
index a68cd4a4ab2..950c359294a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
@@ -61,7 +61,7 @@ public class GroupCoordinatorMetricsShardTest {
         
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
         
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
 
-        snapshotRegistry.getOrCreateSnapshot(1000);
+        snapshotRegistry.idempotentCreateSnapshot(1000);
         // The value should not be updated until the offset has been committed.
         assertEquals(0, shard.numOffsets());
         assertEquals(0, shard.numConsumerGroups());
@@ -87,7 +87,7 @@ public class GroupCoordinatorMetricsShardTest {
         
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
         
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
 
-        snapshotRegistry.getOrCreateSnapshot(2000);
+        snapshotRegistry.idempotentCreateSnapshot(2000);
         shard.commitUpTo(2000);
         assertEquals(0, shard.numOffsets());
         assertEquals(0, shard.numConsumerGroups());
@@ -184,7 +184,7 @@ public class GroupCoordinatorMetricsShardTest {
 
         IntStream.range(0, 4).forEach(__ -> 
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
 
-        snapshotRegistry.getOrCreateSnapshot(1000);
+        snapshotRegistry.idempotentCreateSnapshot(1000);
         shard.commitUpTo(1000);
         assertEquals(4, shard.numConsumerGroups());
         assertEquals(4, 
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
@@ -198,7 +198,7 @@ public class GroupCoordinatorMetricsShardTest {
         group2.updateMember(member2);
         group3.updateMember(member3);
 
-        snapshotRegistry.getOrCreateSnapshot(2000);
+        snapshotRegistry.idempotentCreateSnapshot(2000);
         shard.commitUpTo(2000);
         assertEquals(0, 
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
         assertEquals(4, 
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
@@ -206,7 +206,7 @@ public class GroupCoordinatorMetricsShardTest {
         group2.setGroupEpoch(1);
         group3.setGroupEpoch(1);
 
-        snapshotRegistry.getOrCreateSnapshot(3000);
+        snapshotRegistry.idempotentCreateSnapshot(3000);
         shard.commitUpTo(3000);
         assertEquals(0, 
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
         assertEquals(2, 
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
@@ -219,7 +219,7 @@ public class GroupCoordinatorMetricsShardTest {
             
.setPartitionsPendingRevocation(Collections.singletonMap(Uuid.ZERO_UUID, 
Collections.singleton(0)))
             .build();
 
-        snapshotRegistry.getOrCreateSnapshot(4000);
+        snapshotRegistry.idempotentCreateSnapshot(4000);
         shard.commitUpTo(4000);
         assertEquals(0, 
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
         assertEquals(1, 
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index 3631a581cd3..31995a71757 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -154,8 +154,8 @@ public class GroupCoordinatorMetricsTest {
             9
         );
 
-        snapshotRegistry0.getOrCreateSnapshot(1000);
-        snapshotRegistry1.getOrCreateSnapshot(1500);
+        snapshotRegistry0.idempotentCreateSnapshot(1000);
+        snapshotRegistry1.idempotentCreateSnapshot(1500);
         shard0.commitUpTo(1000);
         shard1.commitUpTo(1500);
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index c5cd2ab194c..e82128ab594 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1110,12 +1110,12 @@ public class ConsumerGroupTest {
             new TopicPartition("__consumer_offsets", 0)
         );
         ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", 
metricsShard);
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
         group.updateMember(new ConsumerGroupMember.Builder("member1")
             .setSubscribedTopicNames(Collections.singletonList("foo"))
             .build());
-        snapshotRegistry.getOrCreateSnapshot(1);
+        snapshotRegistry.idempotentCreateSnapshot(1);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
         assertEquals(ConsumerGroup.ConsumerGroupState.STABLE.toString(), 
group.stateAsString(1));
     }
@@ -1137,7 +1137,7 @@ public class ConsumerGroupTest {
             group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
 
         // Create a member.
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         group.updateMember(new 
ConsumerGroupMember.Builder("member-id").build());
 
         // The member does not exist at last committed offset 0.
@@ -1244,7 +1244,7 @@ public class ConsumerGroupTest {
     public void testAsDescribedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         ConsumerGroup group = new ConsumerGroup(snapshotRegistry, 
"group-id-1", mock(GroupCoordinatorMetricsShard.class));
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), 
group.stateAsString(0));
 
         group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -1253,7 +1253,7 @@ public class ConsumerGroupTest {
                 .build());
         group.updateMember(new ConsumerGroupMember.Builder("member2")
                 .build());
-        snapshotRegistry.getOrCreateSnapshot(1);
+        snapshotRegistry.idempotentCreateSnapshot(1);
 
         ConsumerGroupDescribeResponseData.DescribedGroup expected = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
             .setGroupId("group-id-1")
@@ -1324,14 +1324,14 @@ public class ConsumerGroupTest {
             new TopicPartition("__consumer_offsets", 0)
         );
         ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo", 
metricsShard);
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         assertTrue(group.isInStates(Collections.singleton("empty"), 0));
         assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
 
         group.updateMember(new ConsumerGroupMember.Builder("member1")
             .setSubscribedTopicNames(Collections.singletonList("foo"))
             .build());
-        snapshotRegistry.getOrCreateSnapshot(1);
+        snapshotRegistry.idempotentCreateSnapshot(1);
         assertTrue(group.isInStates(Collections.singleton("empty"), 0));
         assertTrue(group.isInStates(Collections.singleton("stable"), 1));
         assertFalse(group.isInStates(Collections.singleton("empty"), 1));
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index 3310e080744..45bded5bac7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -678,13 +678,13 @@ public class ShareGroupTest {
     public void testAsListedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo");
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ShareGroupState.EMPTY, shareGroup.state(0));
         assertEquals("Empty", shareGroup.stateAsString(0));
         shareGroup.updateMember(new ShareGroupMember.Builder("member1")
             .setSubscribedTopicNames(Collections.singletonList("foo"))
             .build());
-        snapshotRegistry.getOrCreateSnapshot(1);
+        snapshotRegistry.idempotentCreateSnapshot(1);
         assertEquals(ShareGroupState.EMPTY, shareGroup.state(0));
         assertEquals("Empty", shareGroup.stateAsString(0));
         assertEquals(ShareGroupState.STABLE, shareGroup.state(1));
@@ -786,7 +786,7 @@ public class ShareGroupTest {
     public void testAsDescribedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-id-1");
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         assertEquals(ShareGroupState.EMPTY.toString(), 
shareGroup.stateAsString(0));
 
         shareGroup.updateMember(new ShareGroupMember.Builder("member1")
@@ -794,7 +794,7 @@ public class ShareGroupTest {
                 .build());
         shareGroup.updateMember(new ShareGroupMember.Builder("member2")
                 .build());
-        snapshotRegistry.getOrCreateSnapshot(1);
+        snapshotRegistry.idempotentCreateSnapshot(1);
 
         ShareGroupDescribeResponseData.DescribedGroup expected = new 
ShareGroupDescribeResponseData.DescribedGroup()
             .setGroupId("group-id-1")
@@ -818,14 +818,14 @@ public class ShareGroupTest {
     public void testIsInStatesCaseInsensitive() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
         ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo");
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0));
         assertFalse(shareGroup.isInStates(Collections.singleton("Empty"), 0));
 
         shareGroup.updateMember(new ShareGroupMember.Builder("member1")
             .setSubscribedTopicNames(Collections.singletonList("foo"))
             .build());
-        snapshotRegistry.getOrCreateSnapshot(1);
+        snapshotRegistry.idempotentCreateSnapshot(1);
         assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0));
         assertTrue(shareGroup.isInStates(Collections.singleton("stable"), 1));
         assertFalse(shareGroup.isInStates(Collections.singleton("empty"), 1));
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
index 280215541c0..cc9ae039592 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
@@ -78,7 +78,7 @@ public class TimelineHashMapBenchmark {
             int key = (int) (0xffffffff & ((i * 2862933555777941757L) + 
3037000493L));
             if (j > 10 && key % 3 == 0) {
                 snapshotRegistry.deleteSnapshotsUpTo(epoch - 1000);
-                snapshotRegistry.getOrCreateSnapshot(epoch);
+                snapshotRegistry.idempotentCreateSnapshot(epoch);
                 j = 0;
             } else {
                 j++;
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
index 4094c34e580..b11072648e0 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 
 /**
  * Manages read and write offsets, and in-memory snapshots.
- *
+ * <p>
  * Also manages the following metrics:
  *      kafka.controller:type=KafkaController,name=ActiveControllerCount
  *      kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs
@@ -45,7 +45,7 @@ import java.util.Optional;
  *      kafka.controller:type=KafkaController,name=LastCommittedRecordOffset
  */
 class OffsetControlManager {
-    public static class Builder {
+    static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
         private QuorumControllerMetrics metrics = null;
@@ -71,7 +71,7 @@ class OffsetControlManager {
             return this;
         }
 
-        public OffsetControlManager build() {
+        OffsetControlManager build() {
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = new 
SnapshotRegistry(logContext);
             if (metrics == null) {
@@ -156,7 +156,7 @@ class OffsetControlManager {
         this.lastStableOffset = -1L;
         this.transactionStartOffset = -1L;
         this.nextWriteOffset = -1L;
-        snapshotRegistry.getOrCreateSnapshot(-1L);
+        snapshotRegistry.idempotentCreateSnapshot(-1L);
         metrics.setActive(false);
         metrics.setLastCommittedRecordOffset(-1L);
         metrics.setLastAppliedRecordOffset(-1L);
@@ -249,7 +249,7 @@ class OffsetControlManager {
         // Before switching to active, create an in-memory snapshot at the 
last committed
         // offset. This is required because the active controller assumes that 
there is always
         // an in-memory snapshot at the last committed offset.
-        snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
+        snapshotRegistry.idempotentCreateSnapshot(lastStableOffset);
         this.nextWriteOffset = newNextWriteOffset;
         metrics.setActive(true);
     }
@@ -298,7 +298,7 @@ class OffsetControlManager {
     void handleScheduleAtomicAppend(long endOffset) {
         this.nextWriteOffset = endOffset + 1;
 
-        snapshotRegistry.getOrCreateSnapshot(endOffset);
+        snapshotRegistry.idempotentCreateSnapshot(endOffset);
 
         metrics.setLastAppliedRecordOffset(endOffset);
 
@@ -323,7 +323,7 @@ class OffsetControlManager {
             lastStableOffset = newLastStableOffset;
             snapshotRegistry.deleteSnapshotsUpTo(lastStableOffset);
             if (!active()) {
-                snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
+                snapshotRegistry.idempotentCreateSnapshot(lastStableOffset);
             }
         }
     }
@@ -362,7 +362,7 @@ class OffsetControlManager {
                     "current snapshot.");
         }
         log.info("Successfully loaded snapshot {}.", currentSnapshotName);
-        this.snapshotRegistry.getOrCreateSnapshot(currentSnapshotId.offset());
+        
this.snapshotRegistry.idempotentCreateSnapshot(currentSnapshotId.offset());
         this.lastCommittedOffset = currentSnapshotId.offset();
         this.lastCommittedEpoch = currentSnapshotId.epoch();
         this.lastStableOffset = currentSnapshotId.offset();
@@ -383,7 +383,7 @@ class OffsetControlManager {
             throw new RuntimeException("Can't replay a BeginTransactionRecord 
at " + offset +
                 " because the transaction at " + transactionStartOffset + " 
was never closed.");
         }
-        snapshotRegistry.getOrCreateSnapshot(offset - 1);
+        snapshotRegistry.idempotentCreateSnapshot(offset - 1);
         transactionStartOffset = offset;
         log.info("Replayed {} at offset {}.", message, offset);
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
index 8ad6ce2633e..84143c8b3e1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
@@ -203,7 +203,7 @@ public class AclControlManagerTest {
     @Test
     public void testLoadSnapshot() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new 
LogContext());
-        snapshotRegistry.getOrCreateSnapshot(0);
+        snapshotRegistry.idempotentCreateSnapshot(0);
         AclControlManager manager = new AclControlManager.Builder().
             setSnapshotRegistry(snapshotRegistry).
             build();
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 32401d27447..4a19f0eb11b 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -54,7 +54,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 @Timeout(value = 40)
 public class FeatureControlManagerTest {
 
-    @SuppressWarnings("unchecked")
     private static Map<String, VersionRange> rangeMap(Object... args) {
         Map<String, VersionRange> result = new HashMap<>();
         for (int i = 0; i < args.length; i += 3) {
@@ -100,7 +99,7 @@ public class FeatureControlManagerTest {
             setSnapshotRegistry(snapshotRegistry).
             setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
             build();
-        snapshotRegistry.getOrCreateSnapshot(-1);
+        snapshotRegistry.idempotentCreateSnapshot(-1);
         assertEquals(new 
FinalizedControllerFeatures(Collections.singletonMap("metadata.version", 
(short) 4), -1),
             manager.finalizedFeatures(-1));
         assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
@@ -131,7 +130,7 @@ public class FeatureControlManagerTest {
         FeatureLevelRecord record = new FeatureLevelRecord().
             setName("foo").setFeatureLevel((short) 2);
 
-        snapshotRegistry.getOrCreateSnapshot(-1);
+        snapshotRegistry.idempotentCreateSnapshot(-1);
         FeatureControlManager manager = new FeatureControlManager.Builder().
                 setLogContext(logContext).
                 setQuorumFeatures(features("foo", 1, 2)).
@@ -139,7 +138,7 @@ public class FeatureControlManagerTest {
                 setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
                 build();
         manager.replay(record);
-        snapshotRegistry.getOrCreateSnapshot(123);
+        snapshotRegistry.idempotentCreateSnapshot(123);
         assertEquals(new 
FinalizedControllerFeatures(versionMap("metadata.version", 4, "foo", 2), 123),
             manager.finalizedFeatures(123));
     }
@@ -185,7 +184,7 @@ public class FeatureControlManagerTest {
             updateMap("bar", 3), Collections.emptyMap(), false);
         assertEquals(Collections.singletonMap("bar", ApiError.NONE), 
result.response());
         manager.replay((FeatureLevelRecord) result.records().get(0).message());
-        snapshotRegistry.getOrCreateSnapshot(3);
+        snapshotRegistry.idempotentCreateSnapshot(3);
 
         assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
                 singletonMap("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
@@ -213,7 +212,7 @@ public class FeatureControlManagerTest {
     }
 
     @Test
-    public void testReplayRecords() throws Exception {
+    public void testReplayRecords() {
         LogContext logContext = new LogContext();
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
         FeatureControlManager manager = new FeatureControlManager.Builder().
diff --git 
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java 
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index b35670a0bcc..52ab96ecbcd 100644
--- 
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ 
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -119,7 +119,7 @@ public class SnapshotRegistry {
      * Returns a snapshot iterator that iterates from the snapshots with the
      * lowest epoch to those with the highest.
      */
-    public Iterator<Snapshot> iterator() {
+    Iterator<Snapshot> iterator() {
         return new SnapshotIterator(head.next());
     }
 
@@ -128,7 +128,7 @@ public class SnapshotRegistry {
      * lowest epoch to those with the highest, starting at the snapshot with 
the
      * given epoch.
      */
-    public Iterator<Snapshot> iterator(long epoch) {
+    Iterator<Snapshot> iterator(long epoch) {
         return iterator(getSnapshot(epoch));
     }
 
@@ -136,7 +136,7 @@ public class SnapshotRegistry {
      * Returns a snapshot iterator that iterates from the snapshots with the
      * lowest epoch to those with the highest, starting at the given snapshot.
      */
-    public Iterator<Snapshot> iterator(Snapshot snapshot) {
+    Iterator<Snapshot> iterator(Snapshot snapshot) {
         return new SnapshotIterator(snapshot);
     }
 
@@ -144,7 +144,7 @@ public class SnapshotRegistry {
      * Returns a reverse snapshot iterator that iterates from the snapshots 
with the
      * highest epoch to those with the lowest.
      */
-    public Iterator<Snapshot> reverseIterator() {
+    Iterator<Snapshot> reverseIterator() {
         return new ReverseSnapshotIterator();
     }
 
@@ -172,7 +172,7 @@ public class SnapshotRegistry {
     /**
      * Gets the snapshot for a specific epoch.
      */
-    public Snapshot getSnapshot(long epoch) {
+    Snapshot getSnapshot(long epoch) {
         Snapshot snapshot = snapshots.get(epoch);
         if (snapshot == null) {
             throw new RuntimeException("No in-memory snapshot for epoch " + 
epoch + ". Snapshot " +
@@ -183,13 +183,13 @@ public class SnapshotRegistry {
 
     /**
      * Creates a new snapshot at the given epoch.
-     * <br>
-     * If {@code epoch} already exists and it is the last snapshot then just 
return that snapshot.
+     * <p>
+     * If {@code epoch} already exists, and it is the last snapshot then just 
return that snapshot.
      *
-     * @param epoch             The epoch to create the snapshot at.  The 
current epoch
+     * @param epoch             The epoch to create the snapshot at. The 
current epoch
      *                          will be advanced to one past this epoch.
      */
-    public Snapshot getOrCreateSnapshot(long epoch) {
+    Snapshot getOrCreateSnapshot(long epoch) {
         Snapshot last = head.prev();
         if (last.epoch() > epoch) {
             throw new RuntimeException("Can't create a new in-memory snapshot 
at epoch " + epoch +
@@ -205,6 +205,18 @@ public class SnapshotRegistry {
         return snapshot;
     }
 
+    /**
+     * Creates a new snapshot at the given epoch.
+     * <p>
+     * If {@code epoch} already exists, and it is the last snapshot then this 
operation will do nothing.
+     *
+     * @param epoch             The epoch to create the snapshot at. The 
current epoch
+     *                          will be advanced to one past this epoch.
+     */
+    public void idempotentCreateSnapshot(long epoch) {
+        getOrCreateSnapshot(epoch);
+    }
+
     /**
      * Reverts the state of all data structures to the state at the given 
epoch.
      *
@@ -238,7 +250,7 @@ public class SnapshotRegistry {
      *
      * @param snapshot          The snapshot to delete.
      */
-    public void deleteSnapshot(Snapshot snapshot) {
+    void deleteSnapshot(Snapshot snapshot) {
         Snapshot prev = snapshot.prev();
         if (prev != head) {
             prev.mergeFrom(snapshot);
@@ -274,7 +286,7 @@ public class SnapshotRegistry {
     /**
      * Associate a revertable with this registry.
      */
-    public void register(Revertable revertable) {
+    void register(Revertable revertable) {
         revertables.add(revertable);
     }
 
diff --git 
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
 
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index 8fb3dbd50d3..c080113c9a5 100644
--- 
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ 
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -29,33 +29,33 @@ import java.util.NoSuchElementException;
  * We handle divergences between the current state and historical state by 
copying a
  * reference to elements that have been deleted or overwritten into the most 
recent
  * snapshot tier.
- * <br>
+ * <p>
  * Note that there are no keys in SnapshottableHashTable, only values.  So it 
more similar
  * to a hash set than a hash map.  The subclasses implement full-featured maps 
and sets
  * using this class as a building block.
- * <br>
+ * <p>
  * Each snapshot tier contains a size and a hash table.  The size reflects the 
size at
  * the time the snapshot was taken.  Note that, as an optimization, snapshot 
tiers will
  * be null if they don't contain anything.  So for example, if snapshot 20 of 
Object O
  * contains the same entries as snapshot 10 of that object, the snapshot 20 
tier for
  * object O will be null.
- * <br>
+ * <p>
  * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
  * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
  * doesn't have value types, subclassing is the only way to avoid another 
pointer
  * indirection and the associated extra memory cost.
- * <br>
+ * <p>
  * Note that each element in the hash table contains a start epoch, and a 
value.  The
  * start epoch is there to identify when the object was first inserted.  This 
in turn
  * determines which snapshots it is a member of.
- * <br>
+ * <p>
  * In order to retrieve an object from snapshot E, we start by checking to see 
if the
  * object exists in the "current" hash tier.  If it does, and its startEpoch 
extends back
  * to E, we return that object.  Otherwise, we check all the snapshot tiers, 
starting
  * with E, and ending with the most recent snapshot, to see if the object is 
there.
  * As an optimization, if we encounter the object in a snapshot tier but its 
epoch is too
  * new, we know that its value at epoch E must be null, so we can return that 
immediately.
- * <br>
+ * <p>
  * The class hierarchy looks like this:
  * <pre>
  *        Revertable       BaseHashTable
@@ -66,11 +66,11 @@ import java.util.NoSuchElementException;
  * </pre>
  * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
  * pretty bare-bones since this class is not intended to be used directly by 
end-users.
- * <br>
+ * <p>
  * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
  * snapshots.  This is the core of the snapshotted hash table code and handles 
the
  * tiering.
- * <br>
+ * <p>
  * TimelineHashSet and TimelineHashMap are mostly wrappers around this
  * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
  * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
@@ -78,11 +78,11 @@ import java.util.NoSuchElementException;
  * The accessor APIs have two versions -- one that looks at the current state, 
and one
  * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate the
  * current state.
- * <br>
+ * <p>
  * One very important feature of SnapshottableHashTable is that we support 
iterating
  * over a snapshot even while changes are being made to the current state.  
See the
  * Javadoc for the iterator for more information about how this is 
accomplished.
- * <br>
+ * <p>
  * All of these classes require external synchronization, and don't support 
null keys or
  * values.
  */
@@ -135,7 +135,7 @@ class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEp
 
     /**
      * Iterate over the values that currently exist in the hash table.
-     *
+     * <p>
      * You can use this iterator even if you are making changes to the map.
      * The changes may or may not be visible while you are iterating.
      */
@@ -185,7 +185,7 @@ class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEp
 
     /**
      * Iterate over the values that existed in the hash table during a 
specific snapshot.
-     *
+     * <p>
      * You can use this iterator even if you are making changes to the map.
      * The snapshot is immutable and will always show up the same.
      */
@@ -408,37 +408,6 @@ class SnapshottableHashTable<T extends 
SnapshottableHashTable.ElementWithStartEp
         }
     }
 
-    String snapshottableToDebugString() {
-        StringBuilder bld = new StringBuilder();
-        bld.append(String.format("SnapshottableHashTable{%n"));
-        bld.append("top tier: ");
-        bld.append(baseToDebugString());
-        bld.append(String.format(",%nsnapshot tiers: [%n"));
-        String prefix = "";
-        for (Iterator<Snapshot> iter = snapshotRegistry.iterator(); 
iter.hasNext(); ) {
-            Snapshot snapshot = iter.next();
-            bld.append(prefix);
-            bld.append("epoch ").append(snapshot.epoch()).append(": ");
-            HashTier<T> tier = snapshot.getDelta(this);
-            if (tier == null) {
-                bld.append("null");
-            } else {
-                bld.append("HashTier{");
-                bld.append("size=").append(tier.size);
-                bld.append(", deltaTable=");
-                if (tier.deltaTable == null) {
-                    bld.append("null");
-                } else {
-                    bld.append(tier.deltaTable.baseToDebugString());
-                }
-                bld.append("}");
-            }
-            bld.append(String.format("%n"));
-        }
-        bld.append(String.format("]}%n"));
-        return bld.toString();
-    }
-
     @SuppressWarnings("unchecked")
     @Override
     public void executeRevert(long targetEpoch, Delta delta) {


Reply via email to