This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 7e27a78f491 KAFKA-19390 Call safeForceUnmap() in 
AbstractIndex.resize() on Linux to prevent stale mmap of index files (#20621)
7e27a78f491 is described below

commit 7e27a78f491c9df0bf41538e20e1254bd0c3dfe8
Author: Masahiro Mori <[email protected]>
AuthorDate: Mon Oct 6 23:28:03 2025 +0900

    KAFKA-19390 Call safeForceUnmap() in AbstractIndex.resize() on Linux to 
prevent stale mmap of index files (#20621)
    
    This backports
    [KAFKA-19390](https://issues.apache.org/jira/browse/KAFKA-19390) to
    kafka v4.0.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/server/util/LockUtils.java    |  84 +++++++++++++
 .../kafka/storage/internals/log/AbstractIndex.java | 130 +++++++++++----------
 .../kafka/storage/internals/log/OffsetIndex.java   |  38 +++---
 .../kafka/storage/internals/log/TimeIndex.java     |  49 +++-----
 .../storage/internals/log/AbstractIndexTest.java   |  89 ++++++++++++++
 .../storage/internals/log/OffsetIndexTest.java     |   2 +-
 6 files changed, 272 insertions(+), 120 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java 
b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
new file mode 100644
index 00000000000..86338726d5e
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.Objects;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * A utility class providing helper methods for working with {@link Lock} 
objects.
+ * This class simplifies the usage of locks by encapsulating common patterns,
+ * such as acquiring and releasing locks in a safe manner.
+ */
+public class LockUtils {
+    @FunctionalInterface
+    public interface ThrowingSupplier<T, E extends Exception> {
+        T get() throws E;
+    }
+    @FunctionalInterface
+    public interface ThrowingRunnable<E extends Exception> {
+        void run() throws E;
+    }
+
+    /**
+     * Executes the given {@link ThrowingSupplier} within the context of the 
specified {@link Lock}.
+     * The lock is acquired before executing the supplier and released after 
the execution,
+     * ensuring that the lock is always released, even if an exception is 
thrown.
+     *
+     * @param <T>      the type of the result returned by the supplier
+     * @param <E>      the type of exception that may be thrown by the supplier
+     * @param lock     the lock to be acquired and released
+     * @param supplier the supplier to be executed within the lock context
+     * @return the result of the supplier
+     * @throws E if an exception occurs during the execution of the supplier
+     * @throws NullPointerException if either {@code lock} or {@code supplier} 
is null
+     */
+    public static <T, E extends Exception> T inLock(Lock lock, 
ThrowingSupplier<T, E> supplier) throws E {
+        Objects.requireNonNull(lock, "Lock must not be null");
+        Objects.requireNonNull(supplier, "Supplier must not be null");
+
+        lock.lock();
+        try {
+            return supplier.get();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Executes the given {@link ThrowingRunnable} within the context of the 
specified {@link Lock}.
+     * The lock is acquired before executing the runnable and released after 
the execution,
+     * ensuring that the lock is always released, even if an exception is 
thrown.
+     *
+     * @param <E>      the type of exception that may be thrown by the runnable
+     * @param lock     the lock to be acquired and released
+     * @param runnable the runnable to be executed within the lock context
+     * @throws E if an exception occurs during the execution of the runnable
+     * @throws NullPointerException if either {@code lock} or {@code runnable} 
is null
+     */
+    public static <E extends Exception> void inLock(Lock lock, 
ThrowingRunnable<E> runnable) throws E {
+        Objects.requireNonNull(lock, "Lock must not be null");
+        Objects.requireNonNull(runnable, "Runnable must not be null");
+
+        lock.lock();
+        try {
+            runnable.run();
+        } finally {
+            lock.unlock();
+        }
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
index 46ceb4801a2..f29f87b023d 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.storage.internals.log;
 
 import org.apache.kafka.common.utils.ByteBufferUnmapper;
-import org.apache.kafka.common.utils.OperatingSystem;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.LockUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,8 +33,8 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.util.Objects;
 import java.util.OptionalInt;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * The abstract index class which holds entry format agnostic methods.
@@ -47,7 +47,18 @@ public abstract class AbstractIndex implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(AbstractIndex.class);
 
-    protected final ReentrantLock lock = new ReentrantLock();
+    // Serializes all index operations that mutate internal state.
+    // Readers do not need to acquire this lock because:
+    //  1) MappedByteBuffer provides direct access to the OS-level buffer 
cache,
+    //     which allows concurrent reads in practice.
+    //  2) Clients only read committed data and are not affected by concurrent 
appends/truncates.
+    //     In the rare case when the data is truncated, the follower could 
read inconsistent data.
+    //     The follower has the logic to ignore the inconsistent data through 
crc and leader epoch.
+    //  3) Read and remap operations are coordinated via remapLock to ensure 
visibility of the
+    //     underlying mmap.
+    private final ReentrantLock lock = new ReentrantLock();
+    // Allows concurrent read operations while ensuring exclusive access if 
the underlying mmap is changed
+    private final ReentrantReadWriteLock remapLock = new 
ReentrantReadWriteLock();
 
     private final long baseOffset;
     private final int maxIndexSize;
@@ -187,36 +198,32 @@ public abstract class AbstractIndex implements Closeable {
      * @return a boolean indicating whether the size of the memory map and the 
underneath file is changed or not.
      */
     public boolean resize(int newSize) throws IOException {
-        lock.lock();
-        try {
-            int roundedNewSize = roundDownToExactMultiple(newSize, 
entrySize());
-
-            if (length == roundedNewSize) {
-                log.debug("Index {} was not resized because it already has 
size {}", file.getAbsolutePath(), roundedNewSize);
-                return false;
-            } else {
-                RandomAccessFile raf = new RandomAccessFile(file, "rw");
-                try {
-                    int position = mmap.position();
-
-                    /* Windows or z/OS won't let us modify the file length 
while the file is mmapped :-( */
-                    if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
-                        safeForceUnmap();
-                    raf.setLength(roundedNewSize);
-                    this.length = roundedNewSize;
-                    mmap = 
raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
-                    this.maxEntries = mmap.limit() / entrySize();
-                    mmap.position(position);
-                    log.debug("Resized {} to {}, position is {} and limit is 
{}", file.getAbsolutePath(), roundedNewSize,
-                            mmap.position(), mmap.limit());
-                    return true;
-                } finally {
-                    Utils.closeQuietly(raf, "index file " + file.getName());
-                }
-            }
-        } finally {
-            lock.unlock();
-        }
+        return inLock(() ->
+                inRemapWriteLock(() -> {
+                    int roundedNewSize = roundDownToExactMultiple(newSize, 
entrySize());
+
+                    if (length == roundedNewSize) {
+                        log.debug("Index {} was not resized because it already 
has size {}", file.getAbsolutePath(), roundedNewSize);
+                        return false;
+                    } else {
+                        RandomAccessFile raf = new RandomAccessFile(file, 
"rw");
+                        try {
+                            int position = mmap.position();
+
+                            safeForceUnmap();
+                            raf.setLength(roundedNewSize);
+                            this.length = roundedNewSize;
+                            mmap = 
raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
+                            this.maxEntries = mmap.limit() / entrySize();
+                            mmap.position(position);
+                            log.debug("Resized {} to {}, position is {} and 
limit is {}", file.getAbsolutePath(), roundedNewSize,
+                                    mmap.position(), mmap.limit());
+                            return true;
+                        } finally {
+                            Utils.closeQuietly(raf, "index file " + 
file.getName());
+                        }
+                    }
+                }));
     }
 
     /**
@@ -236,12 +243,9 @@ public abstract class AbstractIndex implements Closeable {
      * Flush the data in the index to disk
      */
     public void flush() {
-        lock.lock();
-        try {
+        inLock(() -> {
             mmap.force();
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     /**
@@ -261,14 +265,11 @@ public abstract class AbstractIndex implements Closeable {
      * the file.
      */
     public void trimToValidSize() throws IOException {
-        lock.lock();
-        try {
+        inLock(() -> {
             if (mmap != null) {
                 resize(entrySize() * entries);
             }
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     /**
@@ -288,12 +289,7 @@ public abstract class AbstractIndex implements Closeable {
         // However, in some cases it can pause application threads(STW) for a 
long moment reading metadata from a physical disk.
         // To prevent this, we forcefully cleanup memory mapping within proper 
execution which never affects API responsiveness.
         // See https://issues.apache.org/jira/browse/KAFKA-4614 for the 
details.
-        lock.lock();
-        try {
-            safeForceUnmap();
-        } finally {
-            lock.unlock();
-        }
+        inLock(() -> inRemapWriteLock(this::safeForceUnmap));
     }
 
     /**
@@ -420,20 +416,28 @@ public abstract class AbstractIndex implements Closeable {
         mmap.position(entries * entrySize());
     }
 
-    /**
-     * Execute the given function in a lock only if we are running on windows 
or z/OS. We do this
-     * because Windows or z/OS won't let us resize a file while it is mmapped. 
As a result we have to force unmap it
-     * and this requires synchronizing reads.
-     */
-    protected final <T, E extends Exception> T maybeLock(Lock lock, 
StorageAction<T, E> action) throws E {
-        if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
-            lock.lock();
-        try {
-            return action.execute();
-        } finally {
-            if (OperatingSystem.IS_WINDOWS || OperatingSystem.IS_ZOS)
-                lock.unlock();
-        }
+    protected final <T, E extends Exception> T 
inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
+        return LockUtils.inLock(lock, action);
+    }
+
+    protected final <E extends Exception> void 
inLock(LockUtils.ThrowingRunnable<E> action) throws E {
+        LockUtils.inLock(lock, action);
+    }
+
+    protected final <T, E extends Exception> T 
inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
+        return LockUtils.inLock(remapLock.readLock(), action);
+    }
+
+    protected final <E extends Exception> void 
inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
+        LockUtils.inLock(remapLock.readLock(), action);
+    }
+
+    protected final <T, E extends Exception> T 
inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
+        return LockUtils.inLock(remapLock.writeLock(), action);
+    }
+
+    protected final <E extends Exception> void 
inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
+        LockUtils.inLock(remapLock.writeLock(), action);
     }
 
     /**
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
index fe872a5cd7f..7d20edf37d3 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/OffsetIndex.java
@@ -56,7 +56,7 @@ public final class OffsetIndex extends AbstractIndex {
     private static final int ENTRY_SIZE = 8;
 
     /* the last offset in the index */
-    private long lastOffset;
+    private volatile long lastOffset;
 
     public OffsetIndex(File file, long baseOffset) throws IOException {
         this(file, baseOffset, -1);
@@ -95,7 +95,7 @@ public final class OffsetIndex extends AbstractIndex {
      *         the pair (baseOffset, 0) is returned.
      */
     public OffsetPosition lookup(long targetOffset) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = largestLowerBoundSlotFor(idx, targetOffset, 
IndexSearchType.KEY);
             if (slot == -1)
@@ -111,7 +111,7 @@ public final class OffsetIndex extends AbstractIndex {
      * @return The offset/position pair at that entry
      */
     public OffsetPosition entry(int n) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             if (n >= entries())
                 throw new IllegalArgumentException("Attempt to fetch the " + n 
+ "th entry from index " +
                     file().getAbsolutePath() + ", which has size " + 
entries());
@@ -125,7 +125,7 @@ public final class OffsetIndex extends AbstractIndex {
      * such offset.
      */
     public Optional<OffsetPosition> fetchUpperBoundOffset(OffsetPosition 
fetchOffset, int fetchSize) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = smallestUpperBoundSlotFor(idx, fetchOffset.position + 
fetchSize, IndexSearchType.VALUE);
             if (slot == -1)
@@ -141,8 +141,7 @@ public final class OffsetIndex extends AbstractIndex {
      * @throws InvalidOffsetException if provided offset is not larger than 
the last offset
      */
     public void append(long offset, int position) {
-        lock.lock();
-        try {
+        inLock(() -> {
             if (isFull())
                 throw new IllegalArgumentException("Attempt to append to a 
full index (size = " + entries() + ").");
 
@@ -157,15 +156,12 @@ public final class OffsetIndex extends AbstractIndex {
             } else
                 throw new InvalidOffsetException("Attempt to append an offset 
" + offset + " to position " + entries() +
                     " no larger than the last offset appended (" + lastOffset 
+ ") to " + file().getAbsolutePath());
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     @Override
     public void truncateTo(long offset) {
-        lock.lock();
-        try {
+        inLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = largestLowerBoundSlotFor(idx, offset, 
IndexSearchType.KEY);
 
@@ -182,9 +178,7 @@ public final class OffsetIndex extends AbstractIndex {
             else
                 newEntries = slot + 1;
             truncateToEntries(newEntries);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     public long lastOffset() {
@@ -218,30 +212,24 @@ public final class OffsetIndex extends AbstractIndex {
      * Truncates index to a known number of entries.
      */
     private void truncateToEntries(int entries) {
-        lock.lock();
-        try {
+        inLock(() -> {
             super.truncateToEntries0(entries);
             this.lastOffset = lastEntry().offset;
             log.debug("Truncated index {} to {} entries; position is now {} 
and last offset is now {}",
-                    file().getAbsolutePath(), entries, mmap().position(), 
lastOffset);
-        } finally {
-            lock.unlock();
-        }
+                file().getAbsolutePath(), entries, mmap().position(), 
lastOffset);
+        });
     }
 
     /**
      * The last entry in the index
      */
     private OffsetPosition lastEntry() {
-        lock.lock();
-        try {
+        return inRemapReadLock(() -> {
             int entries = entries();
             if (entries == 0)
                 return new OffsetPosition(baseOffset(), 0);
             else
                 return parseEntry(mmap(), entries - 1);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
index 3c3fa887fc6..3043c17cf8a 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
@@ -76,10 +76,12 @@ public class TimeIndex extends AbstractIndex {
         TimestampOffset entry = lastEntry();
         long lastTimestamp = entry.timestamp;
         long lastOffset = entry.offset;
-        if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
-            throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
-                + "non-zero size but the last timestamp is " + lastTimestamp + 
" which is less than the first timestamp "
-                + timestamp(mmap(), 0));
+        inRemapReadLock(() -> {
+            if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
+                throw new CorruptIndexException("Corrupt time index found, 
time index file (" + file().getAbsolutePath() + ") has "
+                    + "non-zero size but the last timestamp is " + 
lastTimestamp + " which is less than the first timestamp "
+                    + timestamp(mmap(), 0));
+        });
         if (entries() != 0 && lastOffset < baseOffset())
             throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
                 + "non-zero size but the last offset is " + lastOffset + " 
which is less than the first offset " + baseOffset());
@@ -94,8 +96,7 @@ public class TimeIndex extends AbstractIndex {
      */
     @Override
     public void truncateTo(long offset) {
-        lock.lock();
-        try {
+        inLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = largestLowerBoundSlotFor(idx, offset, 
IndexSearchType.VALUE);
 
@@ -113,9 +114,7 @@ public class TimeIndex extends AbstractIndex {
                 newEntries = slot + 1;
 
             truncateToEntries(newEntries);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     // We override the full check to reserve the last time index entry slot 
for the on roll call.
@@ -134,7 +133,7 @@ public class TimeIndex extends AbstractIndex {
      * @return The timestamp/offset pair at that entry
      */
     public TimestampOffset entry(int n) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             if (n >= entries())
                 throw new IllegalArgumentException("Attempt to fetch the " + n 
+ "th entry from time index "
                     + file().getAbsolutePath() + " which has size " + 
entries());
@@ -151,7 +150,7 @@ public class TimeIndex extends AbstractIndex {
      * @return The time index entry found.
      */
     public TimestampOffset lookup(long targetTimestamp) {
-        return maybeLock(lock, () -> {
+        return inRemapReadLock(() -> {
             ByteBuffer idx = mmap().duplicate();
             int slot = largestLowerBoundSlotFor(idx, targetTimestamp, 
IndexSearchType.KEY);
             if (slot == -1)
@@ -181,8 +180,7 @@ public class TimeIndex extends AbstractIndex {
      *                      gets rolled or the segment is closed.
      */
     public void maybeAppend(long timestamp, long offset, boolean 
skipFullCheck) {
-        lock.lock();
-        try {
+        inLock(() -> {
             if (!skipFullCheck && isFull())
                 throw new IllegalArgumentException("Attempt to append to a 
full time index (size = " + entries() + ").");
 
@@ -212,23 +210,18 @@ public class TimeIndex extends AbstractIndex {
                 if (entries() * ENTRY_SIZE != mmap.position())
                     throw new IllegalStateException(entries() + " entries but 
file position in index is " + mmap.position());
             }
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     @Override
     public boolean resize(int newSize) throws IOException {
-        lock.lock();
-        try {
+        return inLock(() -> {
             if (super.resize(newSize)) {
                 this.lastEntry = lastEntryFromIndexFile();
                 return true;
             } else
                 return false;
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     // Visible for testing, we can make this protected once TimeIndexTest is 
in the same package as this class
@@ -259,30 +252,24 @@ public class TimeIndex extends AbstractIndex {
      * Read the last entry from the index file. This operation involves disk 
access.
      */
     private TimestampOffset lastEntryFromIndexFile() {
-        lock.lock();
-        try {
+        return inRemapReadLock(() -> {
             int entries = entries();
             if (entries == 0)
                 return new TimestampOffset(RecordBatch.NO_TIMESTAMP, 
baseOffset());
             else
                 return parseEntry(mmap(), entries - 1);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 
     /**
      * Truncates index to a known number of entries.
      */
     private void truncateToEntries(int entries) {
-        lock.lock();
-        try {
+        inLock(() -> {
             super.truncateToEntries0(entries);
             this.lastEntry = lastEntryFromIndexFile();
             log.debug("Truncated index {} to {} entries; position is now {} 
and last entry is now {}",
                 file().getAbsolutePath(), entries, mmap().position(), 
lastEntry.offset);
-        } finally {
-            lock.unlock();
-        }
+        });
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
new file mode 100644
index 00000000000..2caa21a45c5
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/AbstractIndexTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbstractIndexTest {
+    private static class TestIndex extends AbstractIndex {
+        private boolean unmapInvoked = false;
+        private MappedByteBuffer unmappedBuffer = null;
+        public TestIndex(File file, long baseOffset, int maxIndexSize, boolean 
writable) throws IOException {
+            super(file, baseOffset, maxIndexSize, writable);
+        }
+
+        @Override
+        protected int entrySize() {
+            return 1;
+        }
+
+        @Override
+        protected IndexEntry parseEntry(ByteBuffer buffer, int n) {
+            return null;
+        }
+
+        @Override
+        public void sanityCheck() {
+            // unused
+        }
+
+        @Override
+        protected void truncate() {
+            // unused
+        }
+
+        @Override
+        public void truncateTo(long offset) {
+            // unused
+        }
+
+        @Override
+        public void forceUnmap() throws IOException {
+            unmapInvoked = true;
+            unmappedBuffer = mmap();
+        }
+    }
+
+    @Test
+    public void testResizeInvokeUnmap() throws IOException {
+        File f = new File(TestUtils.tempDirectory(), "test-index");
+        TestIndex idx = new TestIndex(f, 0L, 100, true);
+        MappedByteBuffer oldMmap = idx.mmap();
+        assertNotNull(idx.mmap(), "MappedByteBuffer should not be null");
+        assertFalse(idx.unmapInvoked, "Unmap should not have been invoked 
yet");
+
+        boolean changed = idx.resize(80);
+        assertTrue(changed);
+        assertTrue(idx.unmapInvoked, "Unmap should have been invoked after 
resize");
+        assertSame(oldMmap, idx.unmappedBuffer, "old mmap should be unmapped");
+        assertNotSame(idx.unmappedBuffer, idx.mmap());
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
index ad7fa590852..cd0dd404b1e 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/OffsetIndexTest.java
@@ -266,4 +266,4 @@ public class OffsetIndexTest {
         Exception e = assertThrows(Exception.class, () -> idx.append(offset, 
1), message);
         assertEquals(IllegalArgumentException.class, e.getClass(), "Got an 
unexpected exception.");
     }
-}
\ No newline at end of file
+}

Reply via email to