This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 7e27a78f491 KAFKA-19390 Call safeForceUnmap() in
AbstractIndex.resize() on Linux to prevent stale mmap of index files (#20621)
7e27a78f491 is described below
commit 7e27a78f491c9df0bf41538e20e1254bd0c3dfe8
Author: Masahiro Mori <[email protected]>
AuthorDate: Mon Oct 6 23:28:03 2025 +0900
KAFKA-19390 Call safeForceUnmap() in AbstractIndex.resize() on Linux to
prevent stale mmap of index files (#20621)
This backports
[KAFKA-19390](https://issues.apache.org/jira/browse/KAFKA-19390) to
kafka v4.0.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/server/util/LockUtils.java | 84 +++++++++++++
.../kafka/storage/internals/log/AbstractIndex.java | 130 +++++++++++----------
.../kafka/storage/internals/log/OffsetIndex.java | 38 +++---
.../kafka/storage/internals/log/TimeIndex.java | 49 +++-----
.../storage/internals/log/AbstractIndexTest.java | 89 ++++++++++++++
.../storage/internals/log/OffsetIndexTest.java | 2 +-
6 files changed, 272 insertions(+), 120 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
new file mode 100644
index 00000000000..86338726d5e
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * A utility class providing helper methods for working with {@link Lock}
objects.
+ * This class simplifies the usage of locks by encapsulating common patterns,
+ * such as acquiring and releasing locks in a safe manner.
+ */
+public class LockUtils {
+ @FunctionalInterface
+ public interface ThrowingSupplier<T, E extends Exception> {
+ T get() throws E;
+ }
+ @FunctionalInterface
+ public interface ThrowingRunnable<E extends Exception> {
+ void run() throws E;
+ }
+
+ /**
+ * Executes the given {@link ThrowingSupplier} within the context of the
specified {@link Lock}.
+ * The lock is acquired before executing the supplier and released after
the execution,
+ * ensuring that the lock is always released, even if an exception is
thrown.
+ *
+ * @param <T> the type of the result returned by the supplier
+ * @param <E> the type of exception that may be thrown by the supplier
+ * @param lock the lock to be acquired and released
+ * @param supplier the supplier to be executed within the lock context
+ * @return the result of the supplier
+ * @throws E if an exception occurs during the execution of the supplier
+ * @throws NullPointerException if either {@code lock} or {@code supplier}
is null
+ */
+ public static <T, E extends Exception> T inLock(Lock lock,
ThrowingSupplier<T, E> supplier) throws E {
+ Objects.requireNonNull(lock, "Lock must not be null");
+ Objects.requireNonNull(supplier, "Supplier must not be null");
+
+ lock.lock();
+ try {
+ return supplier.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Executes the given {@link ThrowingRunnable} within the context of the
specified {@link Lock}.
+ * The lock is acquired before executing the runnable and released after
the execution,
+ * ensuring that the lock is always released, even if an exception is
thrown.
+ *
+ * @param <E> the type of exception that may be thrown by the runnable
+ * @param lock the lock to be acquired and released
+ * @param runnable the runnable to be executed within the lock context
+ * @throws E if an exception occurs during the execution of the runnable
+ * @throws NullPointerException if either {@code lock} or {@code runnable}
is null
+ */
+ public static <E extends Exception> void inLock(Lock lock,
ThrowingRunnable<E> runnable) throws E {
+ Objects.requireNonNull(lock, "Lock must not be null");
+ Objects.requireNonNull(runnable, "Runnable must not be null");
+
+ lock.lock();
+ try {
+ runnable.run();
+ } finally {
+ lock.unlock();
+ }
+ }
+}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
index 46ceb4801a2..f29f87b023d 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
@@ -17,8 +17,8 @@
package org.apache.kafka.storage.internals.log;
import org.apache.kafka.common.utils.ByteBufferUnmapper;
-import org.apache.kafka.common.utils.OperatingSystem;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.LockUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,8 +33,8 @@ import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.Objects;
import java.util.OptionalInt;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The abstract index class which holds entry format agnostic methods.
@@ -47,7 +47,18 @@ public abstract class AbstractIndex implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(AbstractIndex.class);
- protected final ReentrantLock lock = new ReentrantLock();
+ // Serializes all index operations that mutate internal state.
+ // Readers do not need to acquire this lock because:
+ // 1) MappedByteBuffer provides direct access to the OS-level buffer
cache,
+ // which allows concurrent reads in practice.
+ // 2) Clients only read committed data and are not affected by concurrent
appends/truncates.
+ // In the rare case when the data is truncated, the follower could
read inconsistent data.
+ // The follower has the logic to ignore the inconsistent data through
crc and leader epoch.
+ // 3) Read and remap operations are coordinated via remapLock to ensure
visibility of the
+ // underlying mmap.
+ private final ReentrantLock lock = new ReentrantLock();
+ // Allows concurrent read operations while ensuring exclusive access if
the underlying mmap is changed
+ private final ReentrantReadWriteLock remapLock = new
ReentrantReadWriteLock();
private final long baseOffset;
private final int maxIndexSize;
@@ -187,36 +198,32 @@ public abstract class AbstractIndex implements Closeable {
* @return a boolean indicating whether the size of the memory map and the
underneath file is changed or not.
*/
public boolean resize(int newSize) throws IOException {
- lock.lock();
- try {
- int roundedNewSize = roundDownToExactMultiple(newSize,
entrySize());
-
- if (length == roundedNewSize) {
- log.debug("Index {} was not resized because it already has
size {}", file.getAbsolutePath(), roundedNewSize);
- return false;
- } else {
- RandomAccessFile raf = new RandomAccessFile(file, "rw");
- try {
- int position = mmap.position();
-
- /* Windows or z/OS won't let us modify the file length
while the file is mmapped :-( */
- if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
- safeForceUnmap();
- raf.setLength(roundedNewSize);
- this.length = roundedNewSize;
- mmap =
raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
- this.maxEntries = mmap.limit() / entrySize();
- mmap.position(position);
- log.debug("Resized {} to {}, position is {} and limit is
{}", file.getAbsolutePath(), roundedNewSize,
- mmap.position(), mmap.limit());
- return true;
- } finally {
- Utils.closeQuietly(raf, "index file " + file.getName());
- }
- }
- } finally {
- lock.unlock();
- }
+ return inLock(() ->
+ inRemapWriteLock(() -> {
+ int roundedNewSize = roundDownToExactMultiple(newSize,
entrySize());
+
+ if (length == roundedNewSize) {
+ log.debug("Index {} was not resized because it already
has size {}", file.getAbsolutePath(), roundedNewSize);
+ return false;
+ } else {
+ RandomAccessFile raf = new RandomAccessFile(file,
"rw");
+ try {
+ int position = mmap.position();
+
+ safeForceUnmap();
+ raf.setLength(roundedNewSize);
+ this.length = roundedNewSize;
+ mmap =
raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
+ this.maxEntries = mmap.limit() / entrySize();
+ mmap.position(position);
+ log.debug("Resized {} to {}, position is {} and
limit is {}", file.getAbsolutePath(), roundedNewSize,
+ mmap.position(), mmap.limit());
+ return true;
+ } finally {
+ Utils.closeQuietly(raf, "index file " +
file.getName());
+ }
+ }
+ }));
}
/**
@@ -236,12 +243,9 @@ public abstract class AbstractIndex implements Closeable {
* Flush the data in the index to disk
*/
public void flush() {
- lock.lock();
- try {
+ inLock(() -> {
mmap.force();
- } finally {
- lock.unlock();
- }
+ });
}
/**
@@ -261,14 +265,11 @@ public abstract class AbstractIndex implements Closeable {
* the file.
*/
public void trimToValidSize() throws IOException {
- lock.lock();
- try {
+ inLock(() -> {
if (mmap != null) {
resize(entrySize() * entries);
}
- } finally {
- lock.unlock();
- }
+ });
}
/**
@@ -288,12 +289,7 @@ public abstract class AbstractIndex implements Closeable {
// However, in some cases it can pause application threads(STW) for a
long moment reading metadata from a physical disk.
// To prevent this, we forcefully cleanup memory mapping within proper
execution which never affects API responsiveness.
// See https://issues.apache.org/jira/browse/KAFKA-4614 for the
details.
- lock.lock();
- try {
- safeForceUnmap();
- } finally {
- lock.unlock();
- }
+ inLock(() -> inRemapWriteLock(this::safeForceUnmap));
}
/**
@@ -420,20 +416,28 @@ public abstract class AbstractIndex implements Closeable {
mmap.position(entries * entrySize());
}
- /**
- * Execute the given function in a lock only if we are running on windows
or z/OS. We do this
- * because Windows or z/OS won't let us resize a file while it is mmapped.
As a result we have to force unmap it
- * and this requires synchronizing reads.
- */
- protected final <T, E extends Exception> T maybeLock(Lock lock,
StorageAction<T, E> action) throws E {
- if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
- lock.lock();
- try {
- return action.execute();
- } finally {
- if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
- lock.unlock();
- }
+ protected final <T, E extends Exception> T
inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
+ return LockUtils.inLock(lock, action);
+ }
+
+ protected final <E extends Exception> void
inLock(LockUtils.ThrowingRunnable<E> action) throws E {
+ LockUtils.inLock(lock, action);
+ }
+
+ protected final <T, E extends Exception> T
inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
+ return LockUtils.inLock(remapLock.readLock(), action);
+ }
+
+ protected final <E extends Exception> void
inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
+ LockUtils.inLock(remapLock.readLock(), action);
+ }
+
+ protected final <T, E extends Exception> T
inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
+ return LockUtils.inLock(remapLock.writeLock(), action);
+ }
+
+ protected final <E extends Exception> void
inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
+ LockUtils.inLock(remapLock.writeLock(), action);
}
/**
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
index fe872a5cd7f..7d20edf37d3 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
@@ -56,7 +56,7 @@ public final class OffsetIndex extends AbstractIndex {
private static final int ENTRY_SIZE = 8;
/* the last offset in the index */
- private long lastOffset;
+ private volatile long lastOffset;
public OffsetIndex(File file, long baseOffset) throws IOException {
this(file, baseOffset, -1);
@@ -95,7 +95,7 @@ public final class OffsetIndex extends AbstractIndex {
* the pair (baseOffset, 0) is returned.
*/
public OffsetPosition lookup(long targetOffset) {
- return maybeLock(lock, () -> {
+ return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetOffset,
IndexSearchType.KEY);
if (slot == -1)
@@ -111,7 +111,7 @@ public final class OffsetIndex extends AbstractIndex {
* @return The offset/position pair at that entry
*/
public OffsetPosition entry(int n) {
- return maybeLock(lock, () -> {
+ return inRemapReadLock(() -> {
if (n >= entries())
throw new IllegalArgumentException("Attempt to fetch the " + n
+ "th entry from index " +
file().getAbsolutePath() + ", which has size " +
entries());
@@ -125,7 +125,7 @@ public final class OffsetIndex extends AbstractIndex {
* such offset.
*/
public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition
fetchOffset, int fetchSize) {
- return maybeLock(lock, () -> {
+ return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position +
fetchSize, IndexSearchType.VALUE);
if (slot == -1)
@@ -141,8 +141,7 @@ public final class OffsetIndex extends AbstractIndex {
* @throws InvalidOffsetException if provided offset is not larger than
the last offset
*/
public void append(long offset, int position) {
- lock.lock();
- try {
+ inLock(() -> {
if (isFull())
throw new IllegalArgumentException("Attempt to append to a
full index (size = " + entries() + ").");
@@ -157,15 +156,12 @@ public final class OffsetIndex extends AbstractIndex {
} else
throw new InvalidOffsetException("Attempt to append an offset
" + offset + " to position " + entries() +
" no larger than the last offset appended (" + lastOffset
+ ") to " + file().getAbsolutePath());
- } finally {
- lock.unlock();
- }
+ });
}
@Override
public void truncateTo(long offset) {
- lock.lock();
- try {
+ inLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, offset,
IndexSearchType.KEY);
@@ -182,9 +178,7 @@ public final class OffsetIndex extends AbstractIndex {
else
newEntries = slot + 1;
truncateToEntries(newEntries);
- } finally {
- lock.unlock();
- }
+ });
}
public long lastOffset() {
@@ -218,30 +212,24 @@ public final class OffsetIndex extends AbstractIndex {
* Truncates index to a known number of entries.
*/
private void truncateToEntries(int entries) {
- lock.lock();
- try {
+ inLock(() -> {
super.truncateToEntries0(entries);
this.lastOffset = lastEntry().offset;
log.debug("Truncated index {} to {} entries; position is now {}
and last offset is now {}",
- file().getAbsolutePath(), entries, mmap().position(),
lastOffset);
- } finally {
- lock.unlock();
- }
+ file().getAbsolutePath(), entries, mmap().position(),
lastOffset);
+ });
}
/**
* The last entry in the index
*/
private OffsetPosition lastEntry() {
- lock.lock();
- try {
+ return inRemapReadLock(() -> {
int entries = entries();
if (entries == 0)
return new OffsetPosition(baseOffset(), 0);
else
return parseEntry(mmap(), entries - 1);
- } finally {
- lock.unlock();
- }
+ });
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
index 3c3fa887fc6..3043c17cf8a 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
@@ -76,10 +76,12 @@ public class TimeIndex extends AbstractIndex {
TimestampOffset entry = lastEntry();
long lastTimestamp = entry.timestamp;
long lastOffset = entry.offset;
- if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
- throw new CorruptIndexException("Corrupt time index found, time
index file (" + file().getAbsolutePath() + ") has "
- + "non-zero size but the last timestamp is " + lastTimestamp +
" which is less than the first timestamp "
- + timestamp(mmap(), 0));
+ inRemapReadLock(() -> {
+ if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
+ throw new CorruptIndexException("Corrupt time index found,
time index file (" + file().getAbsolutePath() + ") has "
+ + "non-zero size but the last timestamp is " +
lastTimestamp + " which is less than the first timestamp "
+ + timestamp(mmap(), 0));
+ });
if (entries() != 0 && lastOffset < baseOffset())
throw new CorruptIndexException("Corrupt time index found, time
index file (" + file().getAbsolutePath() + ") has "
+ "non-zero size but the last offset is " + lastOffset + "
which is less than the first offset " + baseOffset());
@@ -94,8 +96,7 @@ public class TimeIndex extends AbstractIndex {
*/
@Override
public void truncateTo(long offset) {
- lock.lock();
- try {
+ inLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, offset,
IndexSearchType.VALUE);
@@ -113,9 +114,7 @@ public class TimeIndex extends AbstractIndex {
newEntries = slot + 1;
truncateToEntries(newEntries);
- } finally {
- lock.unlock();
- }
+ });
}
// We override the full check to reserve the last time index entry slot
for the on roll call.
@@ -134,7 +133,7 @@ public class TimeIndex extends AbstractIndex {
* @return The timestamp/offset pair at that entry
*/
public TimestampOffset entry(int n) {
- return maybeLock(lock, () -> {
+ return inRemapReadLock(() -> {
if (n >= entries())
throw new IllegalArgumentException("Attempt to fetch the " + n
+ "th entry from time index "
+ file().getAbsolutePath() + " which has size " +
entries());
@@ -151,7 +150,7 @@ public class TimeIndex extends AbstractIndex {
* @return The time index entry found.
*/
public TimestampOffset lookup(long targetTimestamp) {
- return maybeLock(lock, () -> {
+ return inRemapReadLock(() -> {
ByteBuffer idx = mmap().duplicate();
int slot = largestLowerBoundSlotFor(idx, targetTimestamp,
IndexSearchType.KEY);
if (slot == -1)
@@ -181,8 +180,7 @@ public class TimeIndex extends AbstractIndex {
* gets rolled or the segment is closed.
*/
public void maybeAppend(long timestamp, long offset, boolean
skipFullCheck) {
- lock.lock();
- try {
+ inLock(() -> {
if (!skipFullCheck && isFull())
throw new IllegalArgumentException("Attempt to append to a
full time index (size = " + entries() + ").");
@@ -212,23 +210,18 @@ public class TimeIndex extends AbstractIndex {
if (entries() * ENTRY_SIZE != mmap.position())
throw new IllegalStateException(entries() + " entries but
file position in index is " + mmap.position());
}
- } finally {
- lock.unlock();
- }
+ });
}
@Override
public boolean resize(int newSize) throws IOException {
- lock.lock();
- try {
+ return inLock(() -> {
if (super.resize(newSize)) {
this.lastEntry = lastEntryFromIndexFile();
return true;
} else
return false;
- } finally {
- lock.unlock();
- }
+ });
}
// Visible for testing, we can make this protected once TimeIndexTest is
in the same package as this class
@@ -259,30 +252,24 @@ public class TimeIndex extends AbstractIndex {
* Read the last entry from the index file. This operation involves disk
access.
*/
private TimestampOffset lastEntryFromIndexFile() {
- lock.lock();
- try {
+ return inRemapReadLock(() -> {
int entries = entries();
if (entries == 0)
return new TimestampOffset(RecordBatch.NO_TIMESTAMP,
baseOffset());
else
return parseEntry(mmap(), entries - 1);
- } finally {
- lock.unlock();
- }
+ });
}
/**
* Truncates index to a known number of entries.
*/
private void truncateToEntries(int entries) {
- lock.lock();
- try {
+ inLock(() -> {
super.truncateToEntries0(entries);
this.lastEntry = lastEntryFromIndexFile();
log.debug("Truncated index {} to {} entries; position is now {}
and last entry is now {}",
file().getAbsolutePath(), entries, mmap().position(),
lastEntry.offset);
- } finally {
- lock.unlock();
- }
+ });
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
new file mode 100644
index 00000000000..2caa21a45c5
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbstractIndexTest {
+ private static class TestIndex extends AbstractIndex {
+ private boolean unmapInvoked = false;
+ private MappedByteBuffer unmappedBuffer = null;
+ public TestIndex(File file, long baseOffset, int maxIndexSize, boolean
writable) throws IOException {
+ super(file, baseOffset, maxIndexSize, writable);
+ }
+
+ @Override
+ protected int entrySize() {
+ return 1;
+ }
+
+ @Override
+ protected IndexEntry parseEntry(ByteBuffer buffer, int n) {
+ return null;
+ }
+
+ @Override
+ public void sanityCheck() {
+ // unused
+ }
+
+ @Override
+ protected void truncate() {
+ // unused
+ }
+
+ @Override
+ public void truncateTo(long offset) {
+ // unused
+ }
+
+ @Override
+ public void forceUnmap() throws IOException {
+ unmapInvoked = true;
+ unmappedBuffer = mmap();
+ }
+ }
+
+ @Test
+ public void testResizeInvokeUnmap() throws IOException {
+ File f = new File(TestUtils.tempDirectory(), "test-index");
+ TestIndex idx = new TestIndex(f, 0L, 100, true);
+ MappedByteBuffer oldMmap = idx.mmap();
+ assertNotNull(idx.mmap(), "MappedByteBuffer should not be null");
+ assertFalse(idx.unmapInvoked, "Unmap should not have been invoked
yet");
+
+ boolean changed = idx.resize(80);
+ assertTrue(changed);
+ assertTrue(idx.unmapInvoked, "Unmap should have been invoked after
resize");
+ assertSame(oldMmap, idx.unmappedBuffer, "old mmap should be unmapped");
+ assertNotSame(idx.unmappedBuffer, idx.mmap());
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
index ad7fa590852..cd0dd404b1e 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
@@ -266,4 +266,4 @@ public class OffsetIndexTest {
Exception e = assertThrows(Exception.class, () -> idx.append(offset,
1), message);
assertEquals(IllegalArgumentException.class, e.getClass(), "Got an
unexpected exception.");
}
-}
\ No newline at end of file
+}