This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 50fa5b40a0fbde2341ddca453ac92733314bfe34 Author: Niket <[email protected]> AuthorDate: Thu Jun 30 21:03:54 2022 -0700 KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371) The NPE causes the kraft controller to be in an inconsistent state. Reviewers: Jason Gustafson <[email protected]> --- .../kafka/timeline/SnapshottableHashTable.java | 22 +++++++++++++--------- .../kafka/timeline/SnapshottableHashTableTest.java | 19 +++++++++++++++++++ 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java index cbd0a280fc1..299f65a6f78 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java @@ -111,15 +111,19 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp @Override public void mergeFrom(long epoch, Delta source) { HashTier<T> other = (HashTier<T>) source; - List<T> list = new ArrayList<>(); - Object[] otherElements = other.deltaTable.baseElements(); - for (int slot = 0; slot < otherElements.length; slot++) { - BaseHashTable.unpackSlot(list, otherElements, slot); - for (T element : list) { - // When merging in a later hash tier, we want to keep only the elements - // that were present at our epoch. - if (element.startEpoch() <= epoch) { - deltaTable.baseAddOrReplace(element); + // As an optimization, the deltaTable might not exist for a new key + // as there is no previous value + if (other.deltaTable != null) { + List<T> list = new ArrayList<>(); + Object[] otherElements = other.deltaTable.baseElements(); + for (int slot = 0; slot < otherElements.length; slot++) { + BaseHashTable.unpackSlot(list, otherElements, slot); + for (T element : list) { + // When merging in a later hash tier, we want to keep only the elements + // that were present at our epoch. + if (element.startEpoch() <= epoch) { + deltaTable.baseAddOrReplace(element); + } } } } diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java index 7f1ddcc3ff5..1b9dd1559ea 100644 --- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java +++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java @@ -97,6 +97,25 @@ public class SnapshottableHashTableTest { new SnapshottableHashTable<>(registry, 1); assertEquals(0, table.snapshottableSize(Long.MAX_VALUE)); } + @Test + public void testDeleteOnEmptyDeltaTable() { + // A simple test case to validate the behavior of the TimelineHashSet + // when the deltaTable for a snapshot is null + SnapshotRegistry registry = new SnapshotRegistry(new LogContext()); + TimelineHashSet<String> set = new TimelineHashSet<>(registry, 5); + + registry.getOrCreateSnapshot(100); + set.add("bar"); + registry.getOrCreateSnapshot(200); + set.add("baz"); + registry.revertToSnapshot(100); + assertTrue(set.isEmpty()); + set.add("foo"); + registry.getOrCreateSnapshot(300); + set.remove("bar"); + registry.revertToSnapshot(100); + assertTrue(set.isEmpty()); + } @Test public void testAddAndRemove() {
