This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 05b1380ecbe KAFKA-16897 Move OffsetIndexTest and OffsetMapTest to
storage module (#16244)
05b1380ecbe is described below
commit 05b1380ecbe747a2a14aea6dd4c9992ed40e7eae
Author: Ken Huang <[email protected]>
AuthorDate: Thu Jun 13 07:24:23 2024 +0900
KAFKA-16897 Move OffsetIndexTest and OffsetMapTest to storage module
(#16244)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../scala/unit/kafka/log/OffsetIndexTest.scala | 241 ------------------
.../test/scala/unit/kafka/log/OffsetMapTest.scala | 88 -------
.../storage/internals/log/OffsetIndexTest.java | 269 +++++++++++++++++++++
.../kafka/storage/internals/log/OffsetMapTest.java | 108 +++++++++
4 files changed, 377 insertions(+), 329 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
deleted file mode 100644
index 03159e4450d..00000000000
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io._
-import java.nio.file.Files
-import java.util
-import java.util.{Collections, Optional}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api._
-
-import scala.collection._
-import scala.util.Random
-import kafka.utils.TestUtils
-import org.apache.kafka.common.errors.InvalidOffsetException
-import org.apache.kafka.storage.internals.log.{OffsetIndex, OffsetPosition}
-
-
-import scala.annotation.nowarn
-
-class OffsetIndexTest {
-
- var idx: OffsetIndex = _
- val maxEntries = 30
- val baseOffset = 45L
-
- @BeforeEach
- def setup(): Unit = {
- this.idx = new OffsetIndex(nonExistentTempFile(), baseOffset, 30 * 8)
- }
-
- @AfterEach
- def teardown(): Unit = {
- if (this.idx != null)
- this.idx.file.delete()
- }
-
- @nowarn("cat=deprecation")
- @Test
- def randomLookupTest(): Unit = {
- assertEquals(new OffsetPosition(idx.baseOffset, 0), idx.lookup(92L),
- "Not present value should return physical offset 0.")
-
- // append some random values
- val base = idx.baseOffset.toInt + 1
- val size = idx.maxEntries
- val vals: Seq[(Long, Int)] = monotonicSeq(base,
size).map(_.toLong).zip(monotonicSeq(0, size))
- vals.foreach{x => idx.append(x._1, x._2)}
-
- // should be able to find all those values
- for ((logical, physical) <- vals)
- assertEquals(new OffsetPosition(logical, physical), idx.lookup(logical),
- "Should be able to find values that are present.")
-
- // for non-present values we should find the offset of the largest value
less than or equal to this
- val valMap = new immutable.TreeMap[Long, (Long, Int)]() ++ vals.map(p =>
(p._1, p))
- val offsets = (idx.baseOffset until vals.last._1.toInt).toArray
- Collections.shuffle(util.Arrays.asList(offsets))
- for (offset <- offsets.take(30)) {
- val rightAnswer =
- if (offset < valMap.firstKey)
- new OffsetPosition(idx.baseOffset, 0)
- else
- new OffsetPosition(valMap.to(offset).last._1,
valMap.to(offset).last._2._2)
- assertEquals(rightAnswer, idx.lookup(offset),
- "The index should give the same answer as the sorted map")
- }
- }
-
- @Test
- def lookupExtremeCases(): Unit = {
- assertEquals(new OffsetPosition(idx.baseOffset, 0),
idx.lookup(idx.baseOffset),
- "Lookup on empty file")
- for (i <- 0 until idx.maxEntries)
- idx.append(idx.baseOffset + i + 1, i)
- // check first and last entry
- assertEquals(new OffsetPosition(idx.baseOffset, 0),
idx.lookup(idx.baseOffset))
- assertEquals(new OffsetPosition(idx.baseOffset + idx.maxEntries,
idx.maxEntries - 1), idx.lookup(idx.baseOffset + idx.maxEntries))
- }
-
- @Test
- def testEntry(): Unit = {
- for (i <- 0 until idx.maxEntries)
- idx.append(idx.baseOffset + i + 1, i)
- for (i <- 0 until idx.maxEntries)
- assertEquals(new OffsetPosition(idx.baseOffset + i + 1, i), idx.entry(i))
- }
-
- @Test
- def testEntryOverflow(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => idx.entry(0))
- }
-
- @Test
- def appendTooMany(): Unit = {
- for (i <- 0 until idx.maxEntries) {
- val offset = idx.baseOffset + i + 1
- idx.append(offset, i)
- }
- assertWriteFails("Append should fail on a full index", idx, idx.maxEntries
+ 1, classOf[IllegalArgumentException])
- }
-
- @Test
- def appendOutOfOrder(): Unit = {
- idx.append(51, 0)
- assertThrows(classOf[InvalidOffsetException], () => idx.append(50, 1))
- }
-
- @Test
- def testFetchUpperBoundOffset(): Unit = {
- val first = new OffsetPosition(baseOffset + 0, 0)
- val second = new OffsetPosition(baseOffset + 1, 10)
- val third = new OffsetPosition(baseOffset + 2, 23)
- val fourth = new OffsetPosition(baseOffset + 3, 37)
-
- assertEquals(Optional.empty, idx.fetchUpperBoundOffset(first, 5))
-
- for (offsetPosition <- Seq(first, second, third, fourth))
- idx.append(offsetPosition.offset, offsetPosition.position)
-
- assertEquals(Optional.of(second), idx.fetchUpperBoundOffset(first, 5))
- assertEquals(Optional.of(second), idx.fetchUpperBoundOffset(first, 10))
- assertEquals(Optional.of(third), idx.fetchUpperBoundOffset(first, 23))
- assertEquals(Optional.of(third), idx.fetchUpperBoundOffset(first, 22))
- assertEquals(Optional.of(fourth), idx.fetchUpperBoundOffset(second, 24))
- assertEquals(Optional.empty, idx.fetchUpperBoundOffset(fourth, 1))
- assertEquals(Optional.empty, idx.fetchUpperBoundOffset(first, 200))
- assertEquals(Optional.empty, idx.fetchUpperBoundOffset(second, 200))
- }
-
- @Test
- def testReopen(): Unit = {
- val first = new OffsetPosition(51, 0)
- val sec = new OffsetPosition(52, 1)
- idx.append(first.offset, first.position)
- idx.append(sec.offset, sec.position)
- idx.close()
- val idxRo = new OffsetIndex(idx.file, idx.baseOffset)
- assertEquals(first, idxRo.lookup(first.offset))
- assertEquals(sec, idxRo.lookup(sec.offset))
- assertEquals(sec.offset, idxRo.lastOffset)
- assertEquals(2, idxRo.entries)
- assertWriteFails("Append should fail on read-only index", idxRo, 53,
classOf[IllegalArgumentException])
- }
-
- @Test
- def truncate(): Unit = {
- val idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8)
- idx.truncate()
- for (i <- 1 until 10)
- idx.append(i, i)
-
- // now check the last offset after various truncate points and validate
that we can still append to the index.
- idx.truncateTo(12)
- assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
- "Index should be unchanged by truncate past the end")
- assertEquals(9, idx.lastOffset,
- "9 should be the last entry in the index")
-
- idx.append(10, 10)
- idx.truncateTo(10)
- assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
- "Index should be unchanged by truncate at the end")
- assertEquals(9, idx.lastOffset,
- "9 should be the last entry in the index")
- idx.append(10, 10)
-
- idx.truncateTo(9)
- assertEquals(new OffsetPosition(8, 8), idx.lookup(10),
- "Index should truncate off last entry")
- assertEquals(8, idx.lastOffset,
- "8 should be the last entry in the index")
- idx.append(9, 9)
-
- idx.truncateTo(5)
- assertEquals(new OffsetPosition(4, 4), idx.lookup(10),
- "4 should be the last entry in the index")
- assertEquals(4, idx.lastOffset,
- "4 should be the last entry in the index")
- idx.append(5, 5)
-
- idx.truncate()
- assertEquals(0, idx.entries, "Full truncation should leave no entries")
- idx.append(0, 0)
- }
-
- @Test
- def forceUnmapTest(): Unit = {
- val idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8)
- idx.forceUnmap()
- // mmap should be null after unmap causing lookup to throw a NPE
- assertThrows(classOf[NullPointerException], () => idx.lookup(1))
- }
-
- @Test
- def testSanityLastOffsetEqualToBaseOffset(): Unit = {
- // Test index sanity for the case where the last offset appended to the
index is equal to the base offset
- val baseOffset = 20L
- val idx = new OffsetIndex(nonExistentTempFile(), baseOffset, 10 * 8)
- idx.append(baseOffset, 0)
- idx.sanityCheck()
- }
-
- def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int,
klass: Class[T]): Unit = {
- val e = assertThrows(classOf[Exception], () => idx.append(offset, 1), ()
=> message)
- assertEquals(klass, e.getClass, "Got an unexpected exception.")
- }
-
- def monotonicSeq(base: Int, len: Int): Seq[Int] = {
- val rand = new Random(1L)
- val vals = new mutable.ArrayBuffer[Int](len)
- var last = base
- for (_ <- 0 until len) {
- last += rand.nextInt(15) + 1
- vals += last
- }
- vals
- }
-
- def nonExistentTempFile(): File = {
- val file = TestUtils.tempFile()
- Files.delete(file.toPath)
- file
- }
-
-}
diff --git a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
b/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
deleted file mode 100644
index ccdf69490e4..00000000000
--- a/core/src/test/scala/unit/kafka/log/OffsetMapTest.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.nio._
-import kafka.utils.Exit
-import org.apache.kafka.storage.internals.log.SkimpyOffsetMap
-import org.junit.jupiter.api._
-import org.junit.jupiter.api.Assertions._
-
-class OffsetMapTest {
-
- @Test
- def testBasicValidation(): Unit = {
- validateMap(10)
- validateMap(100)
- validateMap(1000)
- validateMap(5000)
- }
-
- @Test
- def testClear(): Unit = {
- val map = new SkimpyOffsetMap(4000)
- for (i <- 0 until 10)
- map.put(key(i), i)
- for (i <- 0 until 10)
- assertEquals(i.toLong, map.get(key(i)))
- map.clear()
- for (i <- 0 until 10)
- assertEquals(map.get(key(i)), -1L)
- }
-
- @Test
- def testGetWhenFull(): Unit = {
- val map = new SkimpyOffsetMap(4096)
- var i = 37L //any value would do
- while (map.size < map.slots) {
- map.put(key(i), i)
- i = i + 1L
- }
- assertEquals(map.get(key(i)), -1L)
- assertEquals(map.get(key(i-1L)), i-1L)
- }
-
- def key(key: Long) = ByteBuffer.wrap(key.toString.getBytes)
-
- def validateMap(items: Int, loadFactor: Double = 0.5): SkimpyOffsetMap = {
- val map = new SkimpyOffsetMap((items/loadFactor * 24).toInt)
- for (i <- 0 until items)
- map.put(key(i), i)
- for (i <- 0 until items)
- assertEquals(map.get(key(i)), i.toLong)
- map
- }
-
-}
-
-object OffsetMapTest {
- def main(args: Array[String]): Unit = {
- if (args.length != 2) {
- System.err.println("USAGE: java OffsetMapTest size load")
- Exit.exit(1)
- }
- val test = new OffsetMapTest()
- val size = args(0).toInt
- val load = args(1).toDouble
- val start = System.nanoTime
- val map = test.validateMap(size, load)
- val elapsedMs = (System.nanoTime - start) / 1000.0 / 1000.0
- println(s"${map.size} entries in map of size ${map.slots} in $elapsedMs
ms")
- println("Collision rate: %.1f%%".format(100*map.collisionRate))
- }
-}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
new file mode 100644
index 00000000000..c1466b0b0ea
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.errors.InvalidOffsetException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class OffsetIndexTest {
+
+ private OffsetIndex index;
+ private static final long BASE_OFFSET = 45L;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ index = new OffsetIndex(nonExistentTempFile(), BASE_OFFSET, 30 * 8);
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ if (Objects.nonNull(index)) {
+ this.index.close();
+ Files.deleteIfExists(index.file().toPath());
+ }
+ }
+
+ @Test
+ public void randomLookupTest() {
+ assertEquals(new OffsetPosition(index.baseOffset(), 0),
index.lookup(92L),
+ "Not present value should return physical offset 0.");
+ int base = (int) (index.baseOffset() + 1);
+ int size = index.maxEntries();
+ Map<Long, Integer> offsetsToPositions = offsetsToPositions(base, size);
+ offsetsToPositions.forEach((offset, position) -> index.append(offset,
position));
+ // should be able to find all those values
+ offsetsToPositions.forEach((offset, position) ->
+ assertEquals(new OffsetPosition(offset, position),
index.lookup(offset),
+ "Should find the correct position for the offset."));
+
+ // for non-present values we should find the offset of the largest
value less than or equal to this
+ TreeMap<Long, OffsetPosition> valMap = new TreeMap<>();
+ for (Map.Entry<Long, Integer> entry : offsetsToPositions.entrySet()) {
+ valMap.put(entry.getKey(), new OffsetPosition(entry.getKey(),
entry.getValue()));
+ }
+
+ List<Integer> offsets = new ArrayList<>();
+ for (long i = index.baseOffset(); i < valMap.lastKey(); i++) {
+ offsets.add((int) i);
+ }
+ Collections.shuffle(offsets);
+
+ for (int offset : offsets.subList(0, 30)) {
+ OffsetPosition rightAnswer;
+ if (offset < valMap.firstKey()) {
+ rightAnswer = new OffsetPosition(index.baseOffset(), 0);
+ } else {
+ Map.Entry<Long, OffsetPosition> lastEntry =
valMap.floorEntry((long) offset);
+ rightAnswer = new OffsetPosition(lastEntry.getKey(),
lastEntry.getValue().position);
+ }
+ assertEquals(rightAnswer, index.lookup(offset),
+ "The index should give the same answer as the sorted map");
+ }
+ }
+
+ @Test
+ public void lookupExtremeCases() {
+ assertEquals(new OffsetPosition(index.baseOffset(), 0),
index.lookup(index.baseOffset()),
+ "Lookup on empty file");
+ for (int i = 0; i < index.maxEntries(); ++i) {
+ index.append(index.baseOffset() + i + 1, i);
+ }
+ // check first and last entry
+ assertEquals(new OffsetPosition(index.baseOffset(), 0),
index.lookup(index.baseOffset()));
+ assertEquals(new OffsetPosition(index.baseOffset() +
index.maxEntries(),
+ index.maxEntries() - 1), index.lookup(index.baseOffset() +
index.maxEntries()));
+ }
+
+ @Test
+ public void testEntry() {
+ for (int i = 0; i < index.maxEntries(); ++i) {
+ index.append(index.baseOffset() + i, i);
+ }
+ for (int i = 0; i < index.maxEntries(); ++i) {
+ assertEquals(new OffsetPosition(index.baseOffset() + i, i),
index.entry(i));
+ }
+ }
+
+ @Test
+ public void testEntryOverflow() {
+ assertThrows(IllegalArgumentException.class, () -> index.entry(0));
+ }
+
+ @Test
+ public void appendTooMany() {
+ for (int i = 0; i < index.maxEntries(); ++i) {
+ long offset = index.baseOffset() + i + 1;
+ index.append(offset, i);
+ }
+ assertWriteFails("Append should fail on a full index",
+ index, index.maxEntries() + 1);
+ }
+
+ @Test
+ public void appendOutOfOrder() {
+ index.append(51, 0);
+ assertThrows(InvalidOffsetException.class, () -> index.append(50, 1));
+ }
+
+ @Test
+ public void testFetchUpperBoundOffset() {
+ OffsetPosition first = new OffsetPosition(BASE_OFFSET, 0);
+ OffsetPosition second = new OffsetPosition(BASE_OFFSET + 1, 10);
+ OffsetPosition third = new OffsetPosition(BASE_OFFSET + 2, 23);
+ OffsetPosition fourth = new OffsetPosition(BASE_OFFSET + 3, 37);
+
+ assertEquals(Optional.empty(), index.fetchUpperBoundOffset(first, 5));
+
+ Stream.of(first, second, third, fourth)
+ .forEach(offsetPosition -> index.append(offsetPosition.offset,
offsetPosition.position));
+
+ assertEquals(Optional.of(second), index.fetchUpperBoundOffset(first,
5));
+ assertEquals(Optional.of(second), index.fetchUpperBoundOffset(first,
10));
+ assertEquals(Optional.of(third), index.fetchUpperBoundOffset(first,
23));
+ assertEquals(Optional.of(third), index.fetchUpperBoundOffset(first,
22));
+ assertEquals(Optional.of(fourth), index.fetchUpperBoundOffset(second,
24));
+ assertEquals(Optional.empty(), index.fetchUpperBoundOffset(fourth, 1));
+ assertEquals(Optional.empty(), index.fetchUpperBoundOffset(first,
200));
+ assertEquals(Optional.empty(), index.fetchUpperBoundOffset(second,
200));
+ }
+
+ @Test
+ public void testReopen() throws IOException {
+ OffsetPosition first = new OffsetPosition(51, 0);
+ OffsetPosition sec = new OffsetPosition(52, 1);
+ index.append(first.offset, first.position);
+ index.append(sec.offset, sec.position);
+ index.close();
+ OffsetIndex idxRo = new OffsetIndex(index.file(), index.baseOffset());
+ assertEquals(first, idxRo.lookup(first.offset));
+ assertEquals(sec, idxRo.lookup(sec.offset));
+ assertEquals(sec.offset, idxRo.lastOffset());
+ assertEquals(2, idxRo.entries());
+ assertWriteFails("Append should fail on read-only index", idxRo, 53);
+ }
+
+ @Test
+ public void truncate() throws IOException {
+ try (OffsetIndex idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 *
8)) {
+ idx.truncate();
+ IntStream.range(1, 10).forEach(i -> idx.append(i, i));
+
+ // now check the last offset after various truncate points and
validate that we can still append to the index.
+ idx.truncateTo(12);
+ assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
+ "Index should be unchanged by truncate past the end");
+ assertEquals(9, idx.lastOffset(),
+ "9 should be the last entry in the index");
+
+ idx.append(10, 10);
+ idx.truncateTo(10);
+ assertEquals(new OffsetPosition(9, 9), idx.lookup(10),
+ "Index should be unchanged by truncate at the end");
+ assertEquals(9, idx.lastOffset(),
+ "9 should be the last entry in the index");
+ idx.append(10, 10);
+
+ idx.truncateTo(9);
+ assertEquals(new OffsetPosition(8, 8), idx.lookup(10),
+ "Index should truncate off last entry");
+ assertEquals(8, idx.lastOffset(),
+ "8 should be the last entry in the index");
+ idx.append(9, 9);
+
+ idx.truncateTo(5);
+ assertEquals(new OffsetPosition(4, 4), idx.lookup(10),
+ "4 should be the last entry in the index");
+ assertEquals(4, idx.lastOffset(),
+ "4 should be the last entry in the index");
+ idx.append(5, 5);
+
+ idx.truncate();
+ assertEquals(0, idx.entries(), "Full truncation should leave no
entries");
+ }
+
+ }
+
+ @Test
+ public void forceUnmapTest() throws IOException {
+ OffsetIndex idx = new OffsetIndex(nonExistentTempFile(), 0L, 10 * 8);
+ idx.forceUnmap();
+ // mmap should be null after unmap causing lookup to throw a NPE
+ assertThrows(NullPointerException.class, () -> idx.lookup(1));
+ assertThrows(NullPointerException.class, idx::close);
+ }
+
+ @Test
+ public void testSanityLastOffsetEqualToBaseOffset() throws IOException {
+ // Test index sanity for the case where the last offset appended to
the index is equal to the base offset
+ long baseOffset = 20L;
+ try (OffsetIndex idx = new OffsetIndex(nonExistentTempFile(),
baseOffset, 10 * 8)) {
+ idx.append(baseOffset, 0);
+ idx.sanityCheck();
+ }
+ }
+
+ private Map<Long, Integer> offsetsToPositions(int base, int len) {
+ List<Integer> positions = monotonicSeq(0, len);
+ return monotonicSeq(base, len)
+ .stream()
+ .map(Long::valueOf)
+ .collect(TreeMap::new, (m, v) -> m.put(v,
positions.remove(0)), Map::putAll);
+ }
+
+ private List<Integer> monotonicSeq(int base, int len) {
+ Random random = new Random();
+ List<Integer> seq = new ArrayList<>(len);
+ int last = base;
+ for (int i = 0; i < len; i++) {
+ last += random.nextInt(15) + 1;
+ seq.add(last);
+ }
+ return seq;
+ }
+
+ private File nonExistentTempFile() throws IOException {
+ File file = TestUtils.tempFile();
+ Files.deleteIfExists(file.toPath());
+ return file;
+ }
+
+ private void assertWriteFails(String message, OffsetIndex idx, int offset)
{
+ Exception e = assertThrows(Exception.class, () -> idx.append(offset,
1), message);
+ assertEquals(IllegalArgumentException.class, e.getClass(), "Got an
unexpected exception.");
+ }
+}
\ No newline at end of file
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetMapTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetMapTest.java
new file mode 100644
index 00000000000..e19ae2d15ab
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetMapTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.nio.ByteBuffer;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OffsetMapTest {
+
+ private static final int MEMORY_SIZE = 4096;
+
+ @ParameterizedTest
+ @ValueSource(ints = {10, 100, 1000, 5000})
+ public void testBasicValidation(int items) throws Exception {
+ SkimpyOffsetMap map = new SkimpyOffsetMap(items * 48);
+ IntStream.range(0, items).forEach(i -> assertDoesNotThrow(() ->
map.put(key(i), i)));
+ for (int i = 0; i < items; i++) {
+ assertEquals(map.get(key(i)), i);
+ }
+ }
+
+ @Test
+ public void testClear() throws Exception {
+ SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+ IntStream.range(0, 10).forEach(i -> assertDoesNotThrow(() ->
map.put(key(i), i)));
+ for (int i = 0; i < 10; i++) {
+ assertEquals(map.get(key(i)), i);
+ }
+ map.clear();
+ for (int i = 0; i < 10; i++) {
+ assertEquals(-1, map.get(key(i)));
+ }
+ }
+
+ @Test
+ public void testGetWhenFull() throws Exception {
+ SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+ int i = 37;
+ while (map.size() < map.slots()) {
+ map.put(key(i), i);
+ i++;
+ }
+ assertEquals(map.get(key(i)), -1);
+ assertEquals(map.get(key(i - 1)), i - 1);
+ }
+
+ @Test
+ public void testUpdateLatestOffset() throws Exception {
+ SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+ int i = 37;
+ while (map.size() < map.slots()) {
+ map.put(key(i), i);
+ i++;
+ }
+ int lastOffsets = 40;
+ assertEquals(map.get(key(i - 1)), i - 1);
+ map.updateLatestOffset(lastOffsets);
+ assertEquals(map.get(key(lastOffsets)), lastOffsets);
+ }
+
+ @Test
+ public void testLatestOffset() throws Exception {
+ SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+ int i = 37;
+ while (map.size() < map.slots()) {
+ map.put(key(i), i);
+ i++;
+ }
+ assertEquals(map.latestOffset(), i - 1);
+ }
+
+ @Test
+ public void testUtilization() throws Exception {
+ SkimpyOffsetMap map = new SkimpyOffsetMap(MEMORY_SIZE);
+ int i = 37;
+ assertEquals(map.utilization(), 0.0);
+ while (map.size() < map.slots()) {
+ map.put(key(i), i);
+ assertEquals(map.utilization(), (double) map.size() / map.slots());
+ i++;
+ }
+ }
+
+ private ByteBuffer key(Integer key) {
+ return ByteBuffer.wrap(key.toString().getBytes());
+ }
+}
\ No newline at end of file