This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7f8c2a5fc948700b67879788c118d234aa5a66d8
Author: Masahiro Mori <47715365+forest0...@users.noreply.github.com>
AuthorDate: Wed Jul 16 02:07:01 2025 +0900

    MINOR: Refactor LockUtils and improve comments (follow up to KAFKA-19390) 
(#20131)
    
    This PR performs a refactoring of LockUtils and improves inline
    comments, as a follow-up to https://github.com/apache/kafka/pull/19961.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Jun Rao <jun...@gmail.com>
---
 .../org/apache/kafka/server/util/LockUtils.java    | 49 +---------------------
 .../kafka/storage/internals/log/AbstractIndex.java | 46 ++++++++++----------
 .../kafka/storage/internals/log/TimeIndex.java     |  2 +-
 3 files changed, 24 insertions(+), 73 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java 
b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
index 568d109daf3..86338726d5e 100644
--- a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
+++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java
@@ -18,7 +18,6 @@ package org.apache.kafka.server.util;
 
 import java.util.Objects;
 import java.util.concurrent.locks.Lock;
-import java.util.function.Supplier;
 
 /**
  * A utility class providing helper methods for working with {@link Lock} 
objects.
@@ -35,50 +34,6 @@ public class LockUtils {
         void run() throws E;
     }
 
-    /**
-     * Executes the given {@link Supplier} within the context of the specified 
{@link Lock}.
-     * The lock is acquired before executing the supplier and released after 
the execution,
-     * ensuring that the lock is always released, even if an exception is 
thrown.
-     *
-     * @param <T>      the type of the result returned by the supplier
-     * @param lock     the lock to be acquired and released
-     * @param supplier the supplier to be executed within the lock context
-     * @return the result of the supplier
-     * @throws NullPointerException if either {@code lock} or {@code supplier} 
is null
-     */
-    public static <T> T inLock(Lock lock, Supplier<T> supplier) {
-        Objects.requireNonNull(lock, "Lock must not be null");
-        Objects.requireNonNull(supplier, "Supplier must not be null");
-
-        lock.lock();
-        try {
-            return supplier.get();
-        } finally {
-            lock.unlock();
-        }
-    }
-
-    /**
-     * Executes the given {@link Runnable} within the context of the specified 
{@link Lock}.
-     * The lock is acquired before executing the runnable and released after 
the execution,
-     * ensuring that the lock is always released, even if an exception is 
thrown.
-     *
-     * @param lock     the lock to be acquired and released
-     * @param runnable the runnable to be executed within the lock context
-     * @throws NullPointerException if either {@code lock} or {@code runnable} 
is null
-     */
-    public static void inLock(Lock lock, Runnable runnable) {
-        Objects.requireNonNull(lock, "Lock must not be null");
-        Objects.requireNonNull(runnable, "Runnable must not be null");
-
-        lock.lock();
-        try {
-            runnable.run();
-        } finally {
-            lock.unlock();
-        }
-    }
-
     /**
      * Executes the given {@link ThrowingSupplier} within the context of the 
specified {@link Lock}.
      * The lock is acquired before executing the supplier and released after 
the execution,
@@ -92,7 +47,7 @@ public class LockUtils {
      * @throws E if an exception occurs during the execution of the supplier
      * @throws NullPointerException if either {@code lock} or {@code supplier} 
is null
      */
-    public static <T, E extends Exception> T inLockThrows(Lock lock, 
ThrowingSupplier<T, E> supplier) throws E {
+    public static <T, E extends Exception> T inLock(Lock lock, 
ThrowingSupplier<T, E> supplier) throws E {
         Objects.requireNonNull(lock, "Lock must not be null");
         Objects.requireNonNull(supplier, "Supplier must not be null");
 
@@ -115,7 +70,7 @@ public class LockUtils {
      * @throws E if an exception occurs during the execution of the runnable
      * @throws NullPointerException if either {@code lock} or {@code runnable} 
is null
      */
-    public static <E extends Exception> void inLockThrows(Lock lock, 
ThrowingRunnable<E> runnable) throws E {
+    public static <E extends Exception> void inLock(Lock lock, 
ThrowingRunnable<E> runnable) throws E {
         Objects.requireNonNull(lock, "Lock must not be null");
         Objects.requireNonNull(runnable, "Runnable must not be null");
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
index eddd4ed8070..12e99b34739 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java
@@ -35,7 +35,6 @@ import java.util.Objects;
 import java.util.OptionalInt;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
 
 /**
  * The abstract index class which holds entry format agnostic methods.
@@ -48,7 +47,15 @@ public abstract class AbstractIndex implements Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(AbstractIndex.class);
 
-    // Serializes all index operations that mutate internal state
+    // Serializes all index operations that mutate internal state.
+    // Readers do not need to acquire this lock because:
+    //  1) MappedByteBuffer provides direct access to the OS-level buffer 
cache,
+    //     which allows concurrent reads in practice.
+    //  2) Clients only read committed data and are not affected by concurrent 
appends/truncates.
+    //     In the rare case when the data is truncated, the follower could 
read inconsistent data.
+    //     The follower has the logic to ignore the inconsistent data through 
crc and leader epoch.
+    //  3) Read and remap operations are coordinated via remapLock to ensure 
visibility of the
+    //     underlying mmap.
     private final ReentrantLock lock = new ReentrantLock();
     // Allows concurrent read operations while ensuring exclusive access if 
the underlying mmap is changed
     private final ReentrantReadWriteLock remapLock = new 
ReentrantReadWriteLock();
@@ -191,8 +198,8 @@ public abstract class AbstractIndex implements Closeable {
      * @return a boolean indicating whether the size of the memory map and the 
underneath file is changed or not.
      */
     public boolean resize(int newSize) throws IOException {
-        return inLockThrows(() ->
-                inRemapWriteLockThrows(() -> {
+        return inLock(() ->
+                inRemapWriteLock(() -> {
                     int roundedNewSize = roundDownToExactMultiple(newSize, 
entrySize());
 
                     if (length == roundedNewSize) {
@@ -258,7 +265,7 @@ public abstract class AbstractIndex implements Closeable {
      * the file.
      */
     public void trimToValidSize() throws IOException {
-        inLockThrows(() -> {
+        inLock(() -> {
             if (mmap != null) {
                 resize(entrySize() * entries);
             }
@@ -282,10 +289,7 @@ public abstract class AbstractIndex implements Closeable {
         // However, in some cases it can pause application threads(STW) for a 
long moment reading metadata from a physical disk.
         // To prevent this, we forcefully cleanup memory mapping within proper 
execution which never affects API responsiveness.
         // See https://issues.apache.org/jira/browse/KAFKA-4614 for the 
details.
-        inLockThrows(() ->
-                inRemapWriteLockThrows(() -> {
-                    safeForceUnmap();
-                }));
+        inLock(() -> inRemapWriteLock(this::safeForceUnmap));
     }
 
     /**
@@ -412,36 +416,28 @@ public abstract class AbstractIndex implements Closeable {
         mmap.position(entries * entrySize());
     }
 
-    protected final <T> T inLock(Supplier<T> action) {
+    protected final <T, E extends Exception> T 
inLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
         return LockUtils.inLock(lock, action);
     }
 
-    protected final void inLock(Runnable action) {
+    protected final <E extends Exception> void 
inLock(LockUtils.ThrowingRunnable<E> action) throws E {
         LockUtils.inLock(lock, action);
     }
 
-    protected final <T, E extends Exception> T 
inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
-        return LockUtils.inLockThrows(lock, action);
-    }
-
-    protected final <E extends Exception> void 
inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
-        LockUtils.inLockThrows(lock, action);
-    }
-
-    protected final <T> T inRemapReadLock(Supplier<T> action) {
+    protected final <T, E extends Exception> T 
inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
         return LockUtils.inLock(remapLock.readLock(), action);
     }
 
-    protected final void inRemapReadLock(Runnable action) {
+    protected final <E extends Exception> void 
inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E {
         LockUtils.inLock(remapLock.readLock(), action);
     }
 
-    protected final <T, E extends Exception> T 
inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E {
-        return LockUtils.inLockThrows(remapLock.writeLock(), action);
+    protected final <T, E extends Exception> T 
inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E {
+        return LockUtils.inLock(remapLock.writeLock(), action);
     }
 
-    protected final <E extends Exception> void 
inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E {
-        LockUtils.inLockThrows(remapLock.writeLock(), action);
+    protected final <E extends Exception> void 
inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E {
+        LockUtils.inLock(remapLock.writeLock(), action);
     }
 
     /**
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
index e6f50da24ee..3043c17cf8a 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java
@@ -215,7 +215,7 @@ public class TimeIndex extends AbstractIndex {
 
     @Override
     public boolean resize(int newSize) throws IOException {
-        return inLockThrows(() -> {
+        return inLock(() -> {
             if (super.resize(newSize)) {
                 this.lastEntry = lastEntryFromIndexFile();
                 return true;

Reply via email to