This is an automated email from the ASF dual-hosted git repository.
jlprat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e862c09031 KAFKA-15875: Stops leak Snapshot in public methods (#16807)
4e862c09031 is described below
commit 4e862c0903195eb11a715271c29304d28036273d
Author: Josep Prat <[email protected]>
AuthorDate: Thu Aug 8 20:05:47 2024 +0200
KAFKA-15875: Stops leak Snapshot in public methods (#16807)
* KAFKA-15875: Stops leak Snapshot in public methods
The Snapshot class is package protected but it's returned in
several public methods in SnapshotRegistry.
To prevent this accidental leakage, these methods are made
package protected as well. For getOrCreateSnapshot a new
method called IdempotentCreateSnapshot is created that returns void.
* Make builer package protected, replace <br> with <p>
Reviewers: Greg Harris <[email protected]>
---
.../group/runtime/SnapshottableCoordinator.java | 4 +-
.../group/GroupMetadataManagerTestContext.java | 4 +-
.../group/OffsetMetadataManagerTest.java | 4 +-
.../metrics/GroupCoordinatorMetricsShardTest.java | 12 ++---
.../group/metrics/GroupCoordinatorMetricsTest.java | 4 +-
.../group/modern/consumer/ConsumerGroupTest.java | 14 +++---
.../group/modern/share/ShareGroupTest.java | 12 ++---
.../jmh/timeline/TimelineHashMapBenchmark.java | 2 +-
.../kafka/controller/OffsetControlManager.java | 18 +++----
.../kafka/controller/AclControlManagerTest.java | 2 +-
.../controller/FeatureControlManagerTest.java | 11 ++---
.../apache/kafka/timeline/SnapshotRegistry.java | 34 ++++++++-----
.../kafka/timeline/SnapshottableHashTable.java | 55 +++++-----------------
13 files changed, 78 insertions(+), 98 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
index b7f74748c45..467ed84da01 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
@@ -74,7 +74,7 @@ class SnapshottableCoordinator<S extends CoordinatorShard<U>,
U> implements Coor
this.tp = tp;
this.lastWrittenOffset = 0;
this.lastCommittedOffset = 0;
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
}
/**
@@ -145,7 +145,7 @@ class SnapshottableCoordinator<S extends
CoordinatorShard<U>, U> implements Coor
}
lastWrittenOffset = offset;
- snapshotRegistry.getOrCreateSnapshot(offset);
+ snapshotRegistry.idempotentCreateSnapshot(offset);
log.debug("Updated last written offset of {} to {}.", tp, offset);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index cef907da2c1..d9add89eb50 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -532,7 +532,7 @@ public class GroupMetadataManagerTestContext {
this.groupMetadataManager = groupMetadataManager;
this.classicGroupInitialRebalanceDelayMs =
classicGroupInitialRebalanceDelayMs;
this.classicGroupNewMemberJoinTimeoutMs =
classicGroupNewMemberJoinTimeoutMs;
- snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+ snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
}
public void commit() {
@@ -1512,7 +1512,7 @@ public class GroupMetadataManagerTestContext {
}
lastWrittenOffset++;
- snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+ snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
}
void onUnloaded() {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index f4334dc6e54..a8d019258f4 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -481,7 +481,7 @@ public class OffsetMetadataManagerTest {
long producerId,
CoordinatorRecord record
) {
- snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+ snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
ApiMessageAndVersion key = record.key();
ApiMessageAndVersion value = record.value();
@@ -512,7 +512,7 @@ public class OffsetMetadataManagerTest {
long producerId,
TransactionResult result
) {
- snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+ snapshotRegistry.idempotentCreateSnapshot(lastWrittenOffset);
offsetMetadataManager.replayEndTransactionMarker(producerId,
result);
lastWrittenOffset++;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
index a68cd4a4ab2..950c359294a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShardTest.java
@@ -61,7 +61,7 @@ public class GroupCoordinatorMetricsShardTest {
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
- snapshotRegistry.getOrCreateSnapshot(1000);
+ snapshotRegistry.idempotentCreateSnapshot(1000);
// The value should not be updated until the offset has been committed.
assertEquals(0, shard.numOffsets());
assertEquals(0, shard.numConsumerGroups());
@@ -87,7 +87,7 @@ public class GroupCoordinatorMetricsShardTest {
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE);
shard.decrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.DEAD);
- snapshotRegistry.getOrCreateSnapshot(2000);
+ snapshotRegistry.idempotentCreateSnapshot(2000);
shard.commitUpTo(2000);
assertEquals(0, shard.numOffsets());
assertEquals(0, shard.numConsumerGroups());
@@ -184,7 +184,7 @@ public class GroupCoordinatorMetricsShardTest {
IntStream.range(0, 4).forEach(__ ->
shard.incrementNumConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
- snapshotRegistry.getOrCreateSnapshot(1000);
+ snapshotRegistry.idempotentCreateSnapshot(1000);
shard.commitUpTo(1000);
assertEquals(4, shard.numConsumerGroups());
assertEquals(4,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
@@ -198,7 +198,7 @@ public class GroupCoordinatorMetricsShardTest {
group2.updateMember(member2);
group3.updateMember(member3);
- snapshotRegistry.getOrCreateSnapshot(2000);
+ snapshotRegistry.idempotentCreateSnapshot(2000);
shard.commitUpTo(2000);
assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(4,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.STABLE));
@@ -206,7 +206,7 @@ public class GroupCoordinatorMetricsShardTest {
group2.setGroupEpoch(1);
group3.setGroupEpoch(1);
- snapshotRegistry.getOrCreateSnapshot(3000);
+ snapshotRegistry.idempotentCreateSnapshot(3000);
shard.commitUpTo(3000);
assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(2,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
@@ -219,7 +219,7 @@ public class GroupCoordinatorMetricsShardTest {
.setPartitionsPendingRevocation(Collections.singletonMap(Uuid.ZERO_UUID,
Collections.singleton(0)))
.build();
- snapshotRegistry.getOrCreateSnapshot(4000);
+ snapshotRegistry.idempotentCreateSnapshot(4000);
shard.commitUpTo(4000);
assertEquals(0,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.EMPTY));
assertEquals(1,
shard.numConsumerGroups(ConsumerGroup.ConsumerGroupState.ASSIGNING));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index 3631a581cd3..31995a71757 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -154,8 +154,8 @@ public class GroupCoordinatorMetricsTest {
9
);
- snapshotRegistry0.getOrCreateSnapshot(1000);
- snapshotRegistry1.getOrCreateSnapshot(1500);
+ snapshotRegistry0.idempotentCreateSnapshot(1000);
+ snapshotRegistry1.idempotentCreateSnapshot(1500);
shard0.commitUpTo(1000);
shard1.commitUpTo(1500);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
index c5cd2ab194c..e82128ab594 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java
@@ -1110,12 +1110,12 @@ public class ConsumerGroupTest {
new TopicPartition("__consumer_offsets", 0)
);
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo",
metricsShard);
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
- snapshotRegistry.getOrCreateSnapshot(1);
+ snapshotRegistry.idempotentCreateSnapshot(1);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE.toString(),
group.stateAsString(1));
}
@@ -1137,7 +1137,7 @@ public class ConsumerGroupTest {
group.validateOffsetFetch("member-id", 0, Long.MAX_VALUE));
// Create a member.
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
group.updateMember(new
ConsumerGroupMember.Builder("member-id").build());
// The member does not exist at last committed offset 0.
@@ -1244,7 +1244,7 @@ public class ConsumerGroupTest {
public void testAsDescribedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ConsumerGroup group = new ConsumerGroup(snapshotRegistry,
"group-id-1", mock(GroupCoordinatorMetricsShard.class));
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(),
group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
@@ -1253,7 +1253,7 @@ public class ConsumerGroupTest {
.build());
group.updateMember(new ConsumerGroupMember.Builder("member2")
.build());
- snapshotRegistry.getOrCreateSnapshot(1);
+ snapshotRegistry.idempotentCreateSnapshot(1);
ConsumerGroupDescribeResponseData.DescribedGroup expected = new
ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1")
@@ -1324,14 +1324,14 @@ public class ConsumerGroupTest {
new TopicPartition("__consumer_offsets", 0)
);
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo",
metricsShard);
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
assertFalse(group.isInStates(Collections.singleton("Empty"), 0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
- snapshotRegistry.getOrCreateSnapshot(1);
+ snapshotRegistry.idempotentCreateSnapshot(1);
assertTrue(group.isInStates(Collections.singleton("empty"), 0));
assertTrue(group.isInStates(Collections.singleton("stable"), 1));
assertFalse(group.isInStates(Collections.singleton("empty"), 1));
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index 3310e080744..45bded5bac7 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -678,13 +678,13 @@ public class ShareGroupTest {
public void testAsListedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo");
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ShareGroupState.EMPTY, shareGroup.state(0));
assertEquals("Empty", shareGroup.stateAsString(0));
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
- snapshotRegistry.getOrCreateSnapshot(1);
+ snapshotRegistry.idempotentCreateSnapshot(1);
assertEquals(ShareGroupState.EMPTY, shareGroup.state(0));
assertEquals("Empty", shareGroup.stateAsString(0));
assertEquals(ShareGroupState.STABLE, shareGroup.state(1));
@@ -786,7 +786,7 @@ public class ShareGroupTest {
public void testAsDescribedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-id-1");
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
assertEquals(ShareGroupState.EMPTY.toString(),
shareGroup.stateAsString(0));
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
@@ -794,7 +794,7 @@ public class ShareGroupTest {
.build());
shareGroup.updateMember(new ShareGroupMember.Builder("member2")
.build());
- snapshotRegistry.getOrCreateSnapshot(1);
+ snapshotRegistry.idempotentCreateSnapshot(1);
ShareGroupDescribeResponseData.DescribedGroup expected = new
ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1")
@@ -818,14 +818,14 @@ public class ShareGroupTest {
public void testIsInStatesCaseInsensitive() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo");
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0));
assertFalse(shareGroup.isInStates(Collections.singleton("Empty"), 0));
shareGroup.updateMember(new ShareGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
- snapshotRegistry.getOrCreateSnapshot(1);
+ snapshotRegistry.idempotentCreateSnapshot(1);
assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0));
assertTrue(shareGroup.isInStates(Collections.singleton("stable"), 1));
assertFalse(shareGroup.isInStates(Collections.singleton("empty"), 1));
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
index 280215541c0..cc9ae039592 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/timeline/TimelineHashMapBenchmark.java
@@ -78,7 +78,7 @@ public class TimelineHashMapBenchmark {
int key = (int) (0xffffffff & ((i * 2862933555777941757L) +
3037000493L));
if (j > 10 && key % 3 == 0) {
snapshotRegistry.deleteSnapshotsUpTo(epoch - 1000);
- snapshotRegistry.getOrCreateSnapshot(epoch);
+ snapshotRegistry.idempotentCreateSnapshot(epoch);
j = 0;
} else {
j++;
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
index 4094c34e580..b11072648e0 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java
@@ -36,7 +36,7 @@ import java.util.Optional;
/**
* Manages read and write offsets, and in-memory snapshots.
- *
+ * <p>
* Also manages the following metrics:
* kafka.controller:type=KafkaController,name=ActiveControllerCount
* kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs
@@ -45,7 +45,7 @@ import java.util.Optional;
* kafka.controller:type=KafkaController,name=LastCommittedRecordOffset
*/
class OffsetControlManager {
- public static class Builder {
+ static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private QuorumControllerMetrics metrics = null;
@@ -71,7 +71,7 @@ class OffsetControlManager {
return this;
}
- public OffsetControlManager build() {
+ OffsetControlManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new
SnapshotRegistry(logContext);
if (metrics == null) {
@@ -156,7 +156,7 @@ class OffsetControlManager {
this.lastStableOffset = -1L;
this.transactionStartOffset = -1L;
this.nextWriteOffset = -1L;
- snapshotRegistry.getOrCreateSnapshot(-1L);
+ snapshotRegistry.idempotentCreateSnapshot(-1L);
metrics.setActive(false);
metrics.setLastCommittedRecordOffset(-1L);
metrics.setLastAppliedRecordOffset(-1L);
@@ -249,7 +249,7 @@ class OffsetControlManager {
// Before switching to active, create an in-memory snapshot at the
last committed
// offset. This is required because the active controller assumes that
there is always
// an in-memory snapshot at the last committed offset.
- snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
+ snapshotRegistry.idempotentCreateSnapshot(lastStableOffset);
this.nextWriteOffset = newNextWriteOffset;
metrics.setActive(true);
}
@@ -298,7 +298,7 @@ class OffsetControlManager {
void handleScheduleAtomicAppend(long endOffset) {
this.nextWriteOffset = endOffset + 1;
- snapshotRegistry.getOrCreateSnapshot(endOffset);
+ snapshotRegistry.idempotentCreateSnapshot(endOffset);
metrics.setLastAppliedRecordOffset(endOffset);
@@ -323,7 +323,7 @@ class OffsetControlManager {
lastStableOffset = newLastStableOffset;
snapshotRegistry.deleteSnapshotsUpTo(lastStableOffset);
if (!active()) {
- snapshotRegistry.getOrCreateSnapshot(lastStableOffset);
+ snapshotRegistry.idempotentCreateSnapshot(lastStableOffset);
}
}
}
@@ -362,7 +362,7 @@ class OffsetControlManager {
"current snapshot.");
}
log.info("Successfully loaded snapshot {}.", currentSnapshotName);
- this.snapshotRegistry.getOrCreateSnapshot(currentSnapshotId.offset());
+
this.snapshotRegistry.idempotentCreateSnapshot(currentSnapshotId.offset());
this.lastCommittedOffset = currentSnapshotId.offset();
this.lastCommittedEpoch = currentSnapshotId.epoch();
this.lastStableOffset = currentSnapshotId.offset();
@@ -383,7 +383,7 @@ class OffsetControlManager {
throw new RuntimeException("Can't replay a BeginTransactionRecord
at " + offset +
" because the transaction at " + transactionStartOffset + "
was never closed.");
}
- snapshotRegistry.getOrCreateSnapshot(offset - 1);
+ snapshotRegistry.idempotentCreateSnapshot(offset - 1);
transactionStartOffset = offset;
log.info("Replayed {} at offset {}.", message, offset);
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
index 8ad6ce2633e..84143c8b3e1 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
@@ -203,7 +203,7 @@ public class AclControlManagerTest {
@Test
public void testLoadSnapshot() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new
LogContext());
- snapshotRegistry.getOrCreateSnapshot(0);
+ snapshotRegistry.idempotentCreateSnapshot(0);
AclControlManager manager = new AclControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
build();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 32401d27447..4a19f0eb11b 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -54,7 +54,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class FeatureControlManagerTest {
- @SuppressWarnings("unchecked")
private static Map<String, VersionRange> rangeMap(Object... args) {
Map<String, VersionRange> result = new HashMap<>();
for (int i = 0; i < args.length; i += 3) {
@@ -100,7 +99,7 @@ public class FeatureControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
- snapshotRegistry.getOrCreateSnapshot(-1);
+ snapshotRegistry.idempotentCreateSnapshot(-1);
assertEquals(new
FinalizedControllerFeatures(Collections.singletonMap("metadata.version",
(short) 4), -1),
manager.finalizedFeatures(-1));
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
@@ -131,7 +130,7 @@ public class FeatureControlManagerTest {
FeatureLevelRecord record = new FeatureLevelRecord().
setName("foo").setFeatureLevel((short) 2);
- snapshotRegistry.getOrCreateSnapshot(-1);
+ snapshotRegistry.idempotentCreateSnapshot(-1);
FeatureControlManager manager = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(features("foo", 1, 2)).
@@ -139,7 +138,7 @@ public class FeatureControlManagerTest {
setMetadataVersion(MetadataVersion.IBP_3_3_IV0).
build();
manager.replay(record);
- snapshotRegistry.getOrCreateSnapshot(123);
+ snapshotRegistry.idempotentCreateSnapshot(123);
assertEquals(new
FinalizedControllerFeatures(versionMap("metadata.version", 4, "foo", 2), 123),
manager.finalizedFeatures(123));
}
@@ -185,7 +184,7 @@ public class FeatureControlManagerTest {
updateMap("bar", 3), Collections.emptyMap(), false);
assertEquals(Collections.singletonMap("bar", ApiError.NONE),
result.response());
manager.replay((FeatureLevelRecord) result.records().get(0).message());
- snapshotRegistry.getOrCreateSnapshot(3);
+ snapshotRegistry.idempotentCreateSnapshot(3);
assertEquals(ControllerResult.atomicOf(emptyList(), Collections.
singletonMap("bar", new ApiError(Errors.INVALID_UPDATE_VERSION,
@@ -213,7 +212,7 @@ public class FeatureControlManagerTest {
}
@Test
- public void testReplayRecords() throws Exception {
+ public void testReplayRecords() {
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
FeatureControlManager manager = new FeatureControlManager.Builder().
diff --git
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index b35670a0bcc..52ab96ecbcd 100644
---
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -119,7 +119,7 @@ public class SnapshotRegistry {
* Returns a snapshot iterator that iterates from the snapshots with the
* lowest epoch to those with the highest.
*/
- public Iterator<Snapshot> iterator() {
+ Iterator<Snapshot> iterator() {
return new SnapshotIterator(head.next());
}
@@ -128,7 +128,7 @@ public class SnapshotRegistry {
* lowest epoch to those with the highest, starting at the snapshot with
the
* given epoch.
*/
- public Iterator<Snapshot> iterator(long epoch) {
+ Iterator<Snapshot> iterator(long epoch) {
return iterator(getSnapshot(epoch));
}
@@ -136,7 +136,7 @@ public class SnapshotRegistry {
* Returns a snapshot iterator that iterates from the snapshots with the
* lowest epoch to those with the highest, starting at the given snapshot.
*/
- public Iterator<Snapshot> iterator(Snapshot snapshot) {
+ Iterator<Snapshot> iterator(Snapshot snapshot) {
return new SnapshotIterator(snapshot);
}
@@ -144,7 +144,7 @@ public class SnapshotRegistry {
* Returns a reverse snapshot iterator that iterates from the snapshots
with the
* highest epoch to those with the lowest.
*/
- public Iterator<Snapshot> reverseIterator() {
+ Iterator<Snapshot> reverseIterator() {
return new ReverseSnapshotIterator();
}
@@ -172,7 +172,7 @@ public class SnapshotRegistry {
/**
* Gets the snapshot for a specific epoch.
*/
- public Snapshot getSnapshot(long epoch) {
+ Snapshot getSnapshot(long epoch) {
Snapshot snapshot = snapshots.get(epoch);
if (snapshot == null) {
throw new RuntimeException("No in-memory snapshot for epoch " +
epoch + ". Snapshot " +
@@ -183,13 +183,13 @@ public class SnapshotRegistry {
/**
* Creates a new snapshot at the given epoch.
- * <br>
- * If {@code epoch} already exists and it is the last snapshot then just
return that snapshot.
+ * <p>
+ * If {@code epoch} already exists, and it is the last snapshot then just
return that snapshot.
*
- * @param epoch The epoch to create the snapshot at. The
current epoch
+ * @param epoch The epoch to create the snapshot at. The
current epoch
* will be advanced to one past this epoch.
*/
- public Snapshot getOrCreateSnapshot(long epoch) {
+ Snapshot getOrCreateSnapshot(long epoch) {
Snapshot last = head.prev();
if (last.epoch() > epoch) {
throw new RuntimeException("Can't create a new in-memory snapshot
at epoch " + epoch +
@@ -205,6 +205,18 @@ public class SnapshotRegistry {
return snapshot;
}
+ /**
+ * Creates a new snapshot at the given epoch.
+ * <p>
+ * If {@code epoch} already exists, and it is the last snapshot then this
operation will do nothing.
+ *
+ * @param epoch The epoch to create the snapshot at. The
current epoch
+ * will be advanced to one past this epoch.
+ */
+ public void idempotentCreateSnapshot(long epoch) {
+ getOrCreateSnapshot(epoch);
+ }
+
/**
* Reverts the state of all data structures to the state at the given
epoch.
*
@@ -238,7 +250,7 @@ public class SnapshotRegistry {
*
* @param snapshot The snapshot to delete.
*/
- public void deleteSnapshot(Snapshot snapshot) {
+ void deleteSnapshot(Snapshot snapshot) {
Snapshot prev = snapshot.prev();
if (prev != head) {
prev.mergeFrom(snapshot);
@@ -274,7 +286,7 @@ public class SnapshotRegistry {
/**
* Associate a revertable with this registry.
*/
- public void register(Revertable revertable) {
+ void register(Revertable revertable) {
revertables.add(revertable);
}
diff --git
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index 8fb3dbd50d3..c080113c9a5 100644
---
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -29,33 +29,33 @@ import java.util.NoSuchElementException;
* We handle divergences between the current state and historical state by
copying a
* reference to elements that have been deleted or overwritten into the most
recent
* snapshot tier.
- * <br>
+ * <p>
* Note that there are no keys in SnapshottableHashTable, only values. So it
more similar
* to a hash set than a hash map. The subclasses implement full-featured maps
and sets
* using this class as a building block.
- * <br>
+ * <p>
* Each snapshot tier contains a size and a hash table. The size reflects the
size at
* the time the snapshot was taken. Note that, as an optimization, snapshot
tiers will
* be null if they don't contain anything. So for example, if snapshot 20 of
Object O
* contains the same entries as snapshot 10 of that object, the snapshot 20
tier for
* object O will be null.
- * <br>
+ * <p>
* The current tier's data is stored in the fields inherited from
BaseHashTable. It
* would be conceptually simpler to have a separate BaseHashTable object, but
since Java
* doesn't have value types, subclassing is the only way to avoid another
pointer
* indirection and the associated extra memory cost.
- * <br>
+ * <p>
* Note that each element in the hash table contains a start epoch, and a
value. The
* start epoch is there to identify when the object was first inserted. This
in turn
* determines which snapshots it is a member of.
- * <br>
+ * <p>
* In order to retrieve an object from snapshot E, we start by checking to see
if the
* object exists in the "current" hash tier. If it does, and its startEpoch
extends back
* to E, we return that object. Otherwise, we check all the snapshot tiers,
starting
* with E, and ending with the most recent snapshot, to see if the object is
there.
* As an optimization, if we encounter the object in a snapshot tier but its
epoch is too
* new, we know that its value at epoch E must be null, so we can return that
immediately.
- * <br>
+ * <p>
* The class hierarchy looks like this:
* <pre>
* Revertable BaseHashTable
@@ -66,11 +66,11 @@ import java.util.NoSuchElementException;
* </pre>
* BaseHashTable is a simple hash table that uses separate chaining. The
interface is
* pretty bare-bones since this class is not intended to be used directly by
end-users.
- * <br>
+ * <p>
* This class, SnapshottableHashTable, has the logic for snapshotting and
iterating over
* snapshots. This is the core of the snapshotted hash table code and handles
the
* tiering.
- * <br>
+ * <p>
* TimelineHashSet and TimelineHashMap are mostly wrappers around this
* SnapshottableHashTable class. They implement standard Java APIs for Set
and Map,
* respectively. There's a fair amount of boilerplate for this, but it's
necessary so
@@ -78,11 +78,11 @@ import java.util.NoSuchElementException;
* The accessor APIs have two versions -- one that looks at the current state,
and one
* that looks at a historical snapshotted state. Mutation APIs only ever
mutate the
* current state.
- * <br>
+ * <p>
* One very important feature of SnapshottableHashTable is that we support
iterating
* over a snapshot even while changes are being made to the current state.
See the
* Javadoc for the iterator for more information about how this is
accomplished.
- * <br>
+ * <p>
* All of these classes require external synchronization, and don't support
null keys or
* values.
*/
@@ -135,7 +135,7 @@ class SnapshottableHashTable<T extends
SnapshottableHashTable.ElementWithStartEp
/**
* Iterate over the values that currently exist in the hash table.
- *
+ * <p>
* You can use this iterator even if you are making changes to the map.
* The changes may or may not be visible while you are iterating.
*/
@@ -185,7 +185,7 @@ class SnapshottableHashTable<T extends
SnapshottableHashTable.ElementWithStartEp
/**
* Iterate over the values that existed in the hash table during a
specific snapshot.
- *
+ * <p>
* You can use this iterator even if you are making changes to the map.
* The snapshot is immutable and will always show up the same.
*/
@@ -408,37 +408,6 @@ class SnapshottableHashTable<T extends
SnapshottableHashTable.ElementWithStartEp
}
}
- String snapshottableToDebugString() {
- StringBuilder bld = new StringBuilder();
- bld.append(String.format("SnapshottableHashTable{%n"));
- bld.append("top tier: ");
- bld.append(baseToDebugString());
- bld.append(String.format(",%nsnapshot tiers: [%n"));
- String prefix = "";
- for (Iterator<Snapshot> iter = snapshotRegistry.iterator();
iter.hasNext(); ) {
- Snapshot snapshot = iter.next();
- bld.append(prefix);
- bld.append("epoch ").append(snapshot.epoch()).append(": ");
- HashTier<T> tier = snapshot.getDelta(this);
- if (tier == null) {
- bld.append("null");
- } else {
- bld.append("HashTier{");
- bld.append("size=").append(tier.size);
- bld.append(", deltaTable=");
- if (tier.deltaTable == null) {
- bld.append("null");
- } else {
- bld.append(tier.deltaTable.baseToDebugString());
- }
- bld.append("}");
- }
- bld.append(String.format("%n"));
- }
- bld.append(String.format("]}%n"));
- return bld.toString();
- }
-
@SuppressWarnings("unchecked")
@Override
public void executeRevert(long targetEpoch, Delta delta) {