This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05b1380ecbe KAFKA-16897 Move OffsetIndexTest and OffsetMapTest to 
storage module (#16244)
05b1380ecbe is described below

commit 05b1380ecbe747a2a14aea6dd4c9992ed40e7eae
Author: Ken Huang <[email protected]>
AuthorDate: Thu Jun 13 07:24:23 2024 +0900

    KAFKA-16897 Move OffsetIndexTest and OffsetMapTest to storage module 
(#16244)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../scala/unit/kafka/log/OffsetIndexTest.scala     | 241 ------------------
 .../test/scala/unit/kafka/log/OffsetMapTest.scala  |  88 -------
 .../storage/internals/log/OffsetIndexTest.java     | 269 +++++++++++++++++++++
 .../kafka/storage/internals/log/OffsetMapTest.java | 108 +++++++++
 4 files changed, 377 insertions(+), 329 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala 
b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
deleted file mode 100644
index 03159e4450d..00000000000
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io._
-import java.nio.file.Files
-import java.util
-import java.util.{Collections, Optional}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api._
-
-import scala.collection._
-import scala.util.Random
-import kafka.utils.TestUtils
-import org.apache.kafka.common.errors.InvalidOffsetException
-import org.apache.kafka.storage.internals.log.{OffsetIndex, OffsetPosition}
-
-
-import scala.annotation.nowarn
-
-class OffsetIndexTest {
-  
-  var idx: OffsetIndex = _
-  val maxEntries = 30
-  val baseOffset = 45L
-  
-  @BeforeEach
-  def setup(): Unit = {
-    this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset, 30 * 8)
-  }
-  
-  @AfterEach
-  def teardown(): Unit = {
-    if (this.idx != null)
-      this.idx.file.delete()
-  }
-
-  @nowarn("cat=deprecation")
-  @Test
-  def randomLookupTest(): Unit = {
-    assertEquals(new OffsetPosition(idx.baseOffset, 0), idx.lookup(92L),
-      "Not present value should return physical offset 0.")
-    
-    // append some random values
-    val base = idx.baseOffset.toInt + 1
-    val size = idx.maxEntries
-    val vals: Seq[(Long, Int)] = monotonicSeq(base, 
size).map(_.toLong).zip(monotonicSeq(0, size))
-    vals.foreach{x => idx.append(x._1, x._2)}
-    
-    // should be able to find all those values
-    for ((logical, physical) <- vals)
-      assertEquals(new OffsetPosition(logical, physical), idx.lookup(logical),
-        "Should be able to find values that are present.")
-      
-    // for non-present values we should find the offset of the largest value 
less than or equal to this 
-    val valMap = new immutable.TreeMap[Long, (Long, Int)]() ++ vals.map(p => 
(p._1, p))
-    val offsets = (idx.baseOffset until vals.last._1.toInt).toArray
-    Collections.shuffle(util.Arrays.asList(offsets))
-    for (offset <- offsets.take(30)) {
-      val rightAnswer = 
-        if (offset < valMap.firstKey)
-          new OffsetPosition(idx.baseOffset, 0)
-        else
-          new OffsetPosition(valMap.to(offset).last._1, 
valMap.to(offset).last._2._2)
-      assertEquals(rightAnswer, idx.lookup(offset),
-        "The index should give the same answer as the sorted map")
-    }
-  }
-  
-  @Test
-  def lookupExtremeCases(): Unit = {
-    assertEquals(new OffsetPosition(idx.baseOffset, 0), 
idx.lookup(idx.baseOffset),
-      "Lookup on empty file")
-    for (i <- 0 until idx.maxEntries)
-      idx.append(idx.baseOffset + i + 1, i)
-    // check first and last entry
-    assertEquals(new OffsetPosition(idx.baseOffset, 0), 
idx.lookup(idx.baseOffset))
-    assertEquals(new OffsetPosition(idx.baseOffset + idx.maxEntries, 
idx.maxEntries - 1), idx.lookup(idx.baseOffset + idx.maxEntries))
-  }
-
-  @Test
-  def testEntry(): Unit = {
-    for (i <- 0 until idx.maxEntries)
-      idx.append(idx.baseOffset + i + 1, i)
-    for (i <- 0 until idx.maxEntries)
-      assertEquals(new OffsetPosition(idx.baseOffset + i + 1, i), idx.entry(i))
-  }
-
-  @Test
-  def testEntryOverflow(): Unit = {
-    assertThrows(classOf[IllegalArgumentException], () => idx.entry(0))
-  }
-  
-  @Test
-  def appendTooMany(): Unit = {
-    for (i <- 0 until idx.maxEntries) {
-      val offset = idx.baseOffset + i + 1
-      idx.append(offset, i)
-    }
-    assertWriteFails("Append should fail on a full index", idx, idx.maxEntries 
+ 1, classOf[IllegalArgumentException])
-  }
-  
-  @Test
-  def appendOutOfOrder(): Unit = {
-    idx.append(51, 0)
-    assertThrows(classOf[InvalidOffsetException], () => idx.append(50, 1))
-  }
-
-  @Test
-  def testFetchUpperBoundOffset(): Unit = {
-    val first = new OffsetPosition(baseOffset + 0, 0)
-    val second = new OffsetPosition(baseOffset + 1, 10)
-    val third = new OffsetPosition(baseOffset + 2, 23)
-    val fourth = new OffsetPosition(baseOffset + 3, 37)
-
-    assertEquals(Optional.empty, idx.fetchUpperBoundOffset(first, 5))
-
-    for (offsetPosition <- Seq(first, second, third, fourth))
-      idx.append(offsetPosition.offset, offsetPosition.position)
-
-    assertEquals(Optional.of(second), idx.fetchUpperBoundOffset(first, 5))
-    assertEquals(Optional.of(second), idx.fetchUpperBoundOffset(first, 10))
-    assertEquals(Optional.of(third), idx.fetchUpperBoundOffset(first, 23))
-    assertEquals(Optional.of(third), idx.fetchUpperBoundOffset(first, 22))
-    assertEquals(Optional.of(fourth), idx.fetchUpperBoundOffset(second, 24))
-    assertEquals(Optional.empty, idx.fetchUpperBoundOffset(fourth, 1))
-    assertEquals(Optional.empty, idx.fetchUpperBoundOffset(first, 200))
-    assertEquals(Optional.empty, idx.fetchUpperBoundOffset(second, 200))
-  }
-
-  @Test
-  def testReopen(): Unit = {
-    val first = new OffsetPosition(51, 0)
-    val sec = new OffsetPosition(52, 1)
-    idx.append(first.offset, first.position)
-    idx.append(sec.offset, sec.position)
-    idx.close()
-    val idxRo = new OffsetIndex(idx.file, idx.baseOffset)
-    assertEquals(first, idxRo.lookup(first.offset))
-    assertEquals(sec, idxRo.lookup(sec.offset))
-    assertEquals(sec.offset, idxRo.lastOffset)
-    assertEquals(2, idxRo.entries)
-    assertWriteFails("Append should fail on read-only index", idxRo, 53, 
classOf[IllegalArgumentException])
-  }
-  
-  @Test
-  def truncate(): Unit = {
-    val idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8)
-    idx.truncate()
-    for (i <- 1 until 10)
-      idx.append(i, i)
-      
-    // now check the last offset after various truncate points and validate 
that we can still append to the index.      
-    idx.truncateTo(12)
-    assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
-      "Index should be unchanged by truncate past the end")
-    assertEquals(9, idx.lastOffset,
-      "9 should be the last entry in the index")
-    
-    idx.append(10, 10)
-    idx.truncateTo(10)
-    assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
-      "Index should be unchanged by truncate at the end")
-    assertEquals(9, idx.lastOffset,
-      "9 should be the last entry in the index")
-    idx.append(10, 10)
-    
-    idx.truncateTo(9)
-    assertEquals(new OffsetPosition(8, 8), idx.lookup(10),
-      "Index should truncate off last entry")
-    assertEquals(8, idx.lastOffset,
-      "8 should be the last entry in the index")
-    idx.append(9, 9)
-    
-    idx.truncateTo(5)
-    assertEquals(new OffsetPosition(4, 4), idx.lookup(10),
-      "4 should be the last entry in the index")
-    assertEquals(4, idx.lastOffset,
-      "4 should be the last entry in the index")
-    idx.append(5, 5)
-    
-    idx.truncate()
-    assertEquals(0, idx.entries, "Full truncation should leave no entries")
-    idx.append(0, 0)
-  }
-
-  @Test
-  def forceUnmapTest(): Unit = {
-    val idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8)
-    idx.forceUnmap()
-    // mmap should be null after unmap causing lookup to throw a NPE
-    assertThrows(classOf[NullPointerException], () => idx.lookup(1))
-  }
-
-  @Test
-  def testSanityLastOffsetEqualToBaseOffset(): Unit = {
-    // Test index sanity for the case where the last offset appended to the 
index is equal to the base offset
-    val baseOffset = 20L
-    val idx = new OffsetIndex(nonExistentTempFile(), baseOffset, 10 * 8)
-    idx.append(baseOffset, 0)
-    idx.sanityCheck()
-  }
-  
-  def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int, 
klass: Class[T]): Unit = {
-    val e = assertThrows(classOf[Exception], () => idx.append(offset, 1), () 
=> message)
-    assertEquals(klass, e.getClass, "Got an unexpected exception.")
-  }
-
-  def monotonicSeq(base: Int, len: Int): Seq[Int] = {
-    val rand = new Random(1L)
-    val vals = new mutable.ArrayBuffer[Int](len)
-    var last = base
-    for (_ <- 0 until len) {
-      last += rand.nextInt(15) + 1
-      vals += last
-    }
-    vals
-  }
-  
-  def nonExistentTempFile(): File = {
-    val file = TestUtils.tempFile()
-    Files.delete(file.toPath)
-    file
-  }
-
-}
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala 
b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
deleted file mode 100644
index ccdf69490e4..00000000000
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.nio._
-import kafka.utils.Exit
-import org.apache.kafka.storage.internals.log.SkimpyOffsetMap
-import org.junit.jupiter.api._
-import org.junit.jupiter.api.Assertions._
-
-class OffsetMapTest {
-  
-  @Test
-  def testBasicValidation(): Unit = {
-    validateMap(10)
-    validateMap(100)
-    validateMap(1000)
-    validateMap(5000)
-  }
-  
-  @Test
-  def testClear(): Unit = {
-    val map = new SkimpyOffsetMap(4000)
-    for (i <- 0 until 10)
-      map.put(key(i), i)
-    for (i <- 0 until 10)
-      assertEquals(i.toLong, map.get(key(i)))
-    map.clear()
-    for (i <- 0 until 10)
-      assertEquals(map.get(key(i)), -1L)
-  }
-  
-  @Test
-  def testGetWhenFull(): Unit = {
-    val map = new SkimpyOffsetMap(4096)
-    var i = 37L  //any value would do
-    while (map.size < map.slots) {
-      map.put(key(i), i)
-      i = i + 1L
-    }
-    assertEquals(map.get(key(i)), -1L)
-    assertEquals(map.get(key(i-1L)), i-1L)
-  }
-
-  def key(key: Long) = ByteBuffer.wrap(key.toString.getBytes)
-  
-  def validateMap(items: Int, loadFactor: Double = 0.5): SkimpyOffsetMap = {
-    val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
-    for (i <- 0 until items)
-      map.put(key(i), i)
-    for (i <- 0 until items)
-      assertEquals(map.get(key(i)), i.toLong)
-    map
-  }
-  
-}
-
-object OffsetMapTest {
-  def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
-      System.err.println("USAGE: java OffsetMapTest size load")
-      Exit.exit(1)
-    }
-    val test = new OffsetMapTest()
-    val size = args(0).toInt
-    val load = args(1).toDouble
-    val start = System.nanoTime
-    val map = test.validateMap(size, load)
-    val elapsedMs = (System.nanoTime - start) / 1000.0 / 1000.0
-    println(s"${map.size} entries in map of size ${map.slots} in $elapsedMs 
ms")
-    println("Collision rate: %.1f%%".format(100*map.collisionRate))
-  }
-}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
new file mode 100644
index 00000000000..c1466b0b0ea
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.errors.InvalidOffsetException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetIndexTest {
+
+    private OffsetIndex index;
+    private static final long BASE_OFFSET = 45L;
+
+    @BeforeEach
+    public void setup() throws IOException {
+        index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8);
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        if (Objects.nonNull(index)) {
+            this.index.close();
+            Files.deleteIfExists(index.file().toPath());
+        }
+    }
+
+    @Test
+    public void randomLookupTest() {
+        assertEquals(new OffsetPosition(index.baseOffset(), 0), 
index.lookup(92L),
+                "Not present value should return physical offset 0.");
+        int base = (int) (index.baseOffset() + 1);
+        int size = index.maxEntries();
+        Map<Long, Integer> offsetsToPositions = offsetsToPositions(base, size);
+        offsetsToPositions.forEach((offset, position) -> index.append(offset, 
position));
+        // should be able to find all those values
+        offsetsToPositions.forEach((offset, position) ->
+                assertEquals(new OffsetPosition(offset, position), 
index.lookup(offset),
+                        "Should find the correct position for the offset."));
+
+        // for non-present values we should find the offset of the largest 
value less than or equal to this
+        TreeMap<Long, OffsetPosition> valMap = new TreeMap<>();
+        for (Map.Entry<Long, Integer> entry : offsetsToPositions.entrySet()) {
+            valMap.put(entry.getKey(), new OffsetPosition(entry.getKey(), 
entry.getValue()));
+        }
+
+        List<Integer> offsets = new ArrayList<>();
+        for (long i = index.baseOffset(); i < valMap.lastKey(); i++) {
+            offsets.add((int) i);
+        }
+        Collections.shuffle(offsets);
+
+        for (int offset : offsets.subList(0, 30)) {
+            OffsetPosition rightAnswer;
+            if (offset < valMap.firstKey()) {
+                rightAnswer = new OffsetPosition(index.baseOffset(), 0);
+            } else {
+                Map.Entry<Long, OffsetPosition> lastEntry = 
valMap.floorEntry((long) offset);
+                rightAnswer = new OffsetPosition(lastEntry.getKey(), 
lastEntry.getValue().position);
+            }
+            assertEquals(rightAnswer, index.lookup(offset),
+                    "The index should give the same answer as the sorted map");
+        }
+    }
+
+    @Test
+    public void lookupExtremeCases() {
+        assertEquals(new OffsetPosition(index.baseOffset(), 0), 
index.lookup(index.baseOffset()),
+                "Lookup on empty file");
+        for (int i = 0; i < index.maxEntries(); ++i) {
+            index.append(index.baseOffset() + i + 1, i);
+        }
+        // check first and last entry
+        assertEquals(new OffsetPosition(index.baseOffset(), 0), 
index.lookup(index.baseOffset()));
+        assertEquals(new OffsetPosition(index.baseOffset() + 
index.maxEntries(),
+                index.maxEntries() - 1), index.lookup(index.baseOffset() + 
index.maxEntries()));
+    }
+
+    @Test
+    public void testEntry() {
+        for (int i = 0; i < index.maxEntries(); ++i) {
+            index.append(index.baseOffset() + i, i);
+        }
+        for (int i = 0; i < index.maxEntries(); ++i) {
+            assertEquals(new OffsetPosition(index.baseOffset() + i, i), 
index.entry(i));
+        }
+    }
+
+    @Test
+    public void testEntryOverflow() {
+        assertThrows(IllegalArgumentException.class, () -> index.entry(0));
+    }
+
+    @Test
+    public void appendTooMany() {
+        for (int i = 0; i < index.maxEntries(); ++i) {
+            long offset = index.baseOffset() + i + 1;
+            index.append(offset, i);
+        }
+        assertWriteFails("Append should fail on a full index",
+                index, index.maxEntries() + 1);
+    }
+
+    @Test
+    public void appendOutOfOrder() {
+        index.append(51, 0);
+        assertThrows(InvalidOffsetException.class, () -> index.append(50, 1));
+    }
+
+    @Test
+    public void testFetchUpperBoundOffset() {
+        OffsetPosition first = new OffsetPosition(BASE_OFFSET, 0);
+        OffsetPosition second = new OffsetPosition(BASE_OFFSET + 1, 10);
+        OffsetPosition third = new OffsetPosition(BASE_OFFSET + 2, 23);
+        OffsetPosition fourth = new OffsetPosition(BASE_OFFSET + 3, 37);
+
+        assertEquals(Optional.empty(), index.fetchUpperBoundOffset(first, 5));
+
+        Stream.of(first, second, third, fourth)
+                .forEach(offsetPosition -> index.append(offsetPosition.offset, 
offsetPosition.position));
+
+        assertEquals(Optional.of(second), index.fetchUpperBoundOffset(first, 
5));
+        assertEquals(Optional.of(second), index.fetchUpperBoundOffset(first, 
10));
+        assertEquals(Optional.of(third), index.fetchUpperBoundOffset(first, 
23));
+        assertEquals(Optional.of(third), index.fetchUpperBoundOffset(first, 
22));
+        assertEquals(Optional.of(fourth), index.fetchUpperBoundOffset(second, 
24));
+        assertEquals(Optional.empty(), index.fetchUpperBoundOffset(fourth, 1));
+        assertEquals(Optional.empty(), index.fetchUpperBoundOffset(first, 
200));
+        assertEquals(Optional.empty(), index.fetchUpperBoundOffset(second, 
200));
+    }
+
+    @Test
+    public void testReopen() throws IOException {
+        OffsetPosition first = new OffsetPosition(51, 0);
+        OffsetPosition sec = new OffsetPosition(52, 1);
+        index.append(first.offset, first.position);
+        index.append(sec.offset, sec.position);
+        index.close();
+        OffsetIndex idxRo = new OffsetIndex(index.file(), index.baseOffset());
+        assertEquals(first, idxRo.lookup(first.offset));
+        assertEquals(sec, idxRo.lookup(sec.offset));
+        assertEquals(sec.offset, idxRo.lastOffset());
+        assertEquals(2, idxRo.entries());
+        assertWriteFails("Append should fail on read-only index", idxRo, 53);
+    }
+
+    @Test
+    public void truncate() throws IOException {
+        try (OffsetIndex idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 
8)) {
+            idx.truncate();
+            IntStream.range(1, 10).forEach(i -> idx.append(i, i));
+
+            // now check the last offset after various truncate points and 
validate that we can still append to the index.
+            idx.truncateTo(12);
+            assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
+                    "Index should be unchanged by truncate past the end");
+            assertEquals(9, idx.lastOffset(),
+                    "9 should be the last entry in the index");
+
+            idx.append(10, 10);
+            idx.truncateTo(10);
+            assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
+                    "Index should be unchanged by truncate at the end");
+            assertEquals(9, idx.lastOffset(),
+                    "9 should be the last entry in the index");
+            idx.append(10, 10);
+
+            idx.truncateTo(9);
+            assertEquals(new OffsetPosition(8, 8), idx.lookup(10),
+                    "Index should truncate off last entry");
+            assertEquals(8, idx.lastOffset(),
+                    "8 should be the last entry in the index");
+            idx.append(9, 9);
+
+            idx.truncateTo(5);
+            assertEquals(new OffsetPosition(4, 4), idx.lookup(10),
+                    "4 should be the last entry in the index");
+            assertEquals(4, idx.lastOffset(),
+                    "4 should be the last entry in the index");
+            idx.append(5, 5);
+
+            idx.truncate();
+            assertEquals(0, idx.entries(), "Full truncation should leave no 
entries");
+        }
+
+    }
+
+    @Test
+    public void forceUnmapTest() throws IOException {
+        OffsetIndex idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8);
+        idx.forceUnmap();
+        // mmap should be null after unmap causing lookup to throw a NPE
+        assertThrows(NullPointerException.class, () -> idx.lookup(1));
+        assertThrows(NullPointerException.class, idx::close);
+    }
+
+    @Test
+    public void testSanityLastOffsetEqualToBaseOffset() throws IOException {
+        // Test index sanity for the case where the last offset appended to 
the index is equal to the base offset
+        long baseOffset = 20L;
+        try (OffsetIndex idx = new OffsetIndex(nonExistentTempFile(), 
baseOffset, 10 * 8)) {
+            idx.append(baseOffset, 0);
+            idx.sanityCheck();
+        }
+    }
+
+    private Map<Long, Integer> offsetsToPositions(int base, int len) {
+        List<Integer> positions = monotonicSeq(0, len);
+        return monotonicSeq(base, len)
+                .stream()
+                .map(Long::valueOf)
+                .collect(TreeMap::new, (m, v) -> m.put(v, 
positions.remove(0)), Map::putAll);
+    }
+
+    private List<Integer> monotonicSeq(int base, int len) {
+        Random random = new Random();
+        List<Integer> seq = new ArrayList<>(len);
+        int last = base;
+        for (int i = 0; i < len; i++) {
+            last += random.nextInt(15) + 1;
+            seq.add(last);
+        }
+        return seq;
+    }
+
+    private File nonExistentTempFile() throws IOException {
+        File file = TestUtils.tempFile();
+        Files.deleteIfExists(file.toPath());
+        return file;
+    }
+
+    private void assertWriteFails(String message, OffsetIndex idx, int offset) 
{
+        Exception e = assertThrows(Exception.class, () -> idx.append(offset, 
1), message);
+        assertEquals(IllegalArgumentException.class, e.getClass(), "Got an 
unexpected exception.");
+    }
+}
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetMapTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetMapTest.java
new file mode 100644
index 00000000000..e19ae2d15ab
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetMapTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.ByteBuffer;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OffsetMapTest {
+
+    private static final int MEMORY_SIZE = 4096;
+
+    @ParameterizedTest
+    @ValueSource(ints = {10, 100, 1000, 5000})
+    public void testBasicValidation(int items) throws Exception {
+        SkimpyOffsetMap map = new SkimpyOffsetMap(items * 48);
+        IntStream.range(0, items).forEach(i -> assertDoesNotThrow(() -> 
map.put(key(i), i)));
+        for (int i = 0; i < items; i++) {
+            assertEquals(map.get(key(i)), i);
+        }
+    }
+
+    @Test
+    public void testClear() throws Exception {
+        SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+        IntStream.range(0, 10).forEach(i -> assertDoesNotThrow(() -> 
map.put(key(i), i)));
+        for (int i = 0; i < 10; i++) {
+            assertEquals(map.get(key(i)), i);
+        }
+        map.clear();
+        for (int i = 0; i < 10; i++) {
+            assertEquals(-1, map.get(key(i)));
+        }
+    }
+
+    @Test
+    public void testGetWhenFull() throws Exception {
+        SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+        int i = 37;
+        while (map.size() < map.slots()) {
+            map.put(key(i), i);
+            i++;
+        }
+        assertEquals(map.get(key(i)), -1);
+        assertEquals(map.get(key(i - 1)), i - 1);
+    }
+
+    @Test
+    public void testUpdateLatestOffset() throws Exception {
+        SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+        int i = 37;
+        while (map.size() < map.slots()) {
+            map.put(key(i), i);
+            i++;
+        }
+        int lastOffsets = 40;
+        assertEquals(map.get(key(i - 1)), i - 1);
+        map.updateLatestOffset(lastOffsets);
+        assertEquals(map.get(key(lastOffsets)), lastOffsets);
+    }
+
+    @Test
+    public void testLatestOffset() throws Exception {
+        SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+        int i = 37;
+        while (map.size() < map.slots()) {
+            map.put(key(i), i);
+            i++;
+        }
+        assertEquals(map.latestOffset(), i - 1);
+    }
+    
+    @Test
+    public void testUtilization() throws Exception {
+        SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+        int i = 37;
+        assertEquals(map.utilization(), 0.0);
+        while (map.size() < map.slots()) {
+            map.put(key(i), i);
+            assertEquals(map.utilization(), (double) map.size() / map.slots());
+            i++;
+        }
+    }
+
+    private ByteBuffer key(Integer key) {
+        return ByteBuffer.wrap(key.toString().getBytes());
+    }
+}
\ No newline at end of file

Reply via email to