This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 4.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 7f8c2a5fc948700b67879788c118d234aa5a66d8 Author: Masahiro Mori <47715365+forest0...@users.noreply.github.com> AuthorDate: Wed Jul 16 02:07:01 2025 +0900 MINOR: Refactor LockUtils and improve comments (follow up to KAFKA-19390) (#20131) This PR performs a refactoring of LockUtils and improves inline comments, as a follow-up to https://github.com/apache/kafka/pull/19961. Reviewers: Chia-Ping Tsai <chia7...@gmail.com>, Jun Rao <jun...@gmail.com> --- .../org/apache/kafka/server/util/LockUtils.java | 49 +--------------------- .../kafka/storage/internals/log/AbstractIndex.java | 46 ++++++++++---------- .../kafka/storage/internals/log/TimeIndex.java | 2 +- 3 files changed, 24 insertions(+), 73 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java index 568d109daf3..86338726d5e 100644 --- a/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/LockUtils.java @@ -18,7 +18,6 @@ package org.apache.kafka.server.util; import java.util.Objects; import java.util.concurrent.locks.Lock; -import java.util.function.Supplier; /** * A utility class providing helper methods for working with {@link Lock} objects. @@ -35,50 +34,6 @@ public class LockUtils { void run() throws E; } - /** - * Executes the given {@link Supplier} within the context of the specified {@link Lock}. - * The lock is acquired before executing the supplier and released after the execution, - * ensuring that the lock is always released, even if an exception is thrown. - * - * @param <T> the type of the result returned by the supplier - * @param lock the lock to be acquired and released - * @param supplier the supplier to be executed within the lock context - * @return the result of the supplier - * @throws NullPointerException if either {@code lock} or {@code supplier} is null - */ - public static <T> T inLock(Lock lock, Supplier<T> supplier) { - Objects.requireNonNull(lock, "Lock must not be null"); - Objects.requireNonNull(supplier, "Supplier must not be null"); - - lock.lock(); - try { - return supplier.get(); - } finally { - lock.unlock(); - } - } - - /** - * Executes the given {@link Runnable} within the context of the specified {@link Lock}. - * The lock is acquired before executing the runnable and released after the execution, - * ensuring that the lock is always released, even if an exception is thrown. - * - * @param lock the lock to be acquired and released - * @param runnable the runnable to be executed within the lock context - * @throws NullPointerException if either {@code lock} or {@code runnable} is null - */ - public static void inLock(Lock lock, Runnable runnable) { - Objects.requireNonNull(lock, "Lock must not be null"); - Objects.requireNonNull(runnable, "Runnable must not be null"); - - lock.lock(); - try { - runnable.run(); - } finally { - lock.unlock(); - } - } - /** * Executes the given {@link ThrowingSupplier} within the context of the specified {@link Lock}. * The lock is acquired before executing the supplier and released after the execution, @@ -92,7 +47,7 @@ public class LockUtils { * @throws E if an exception occurs during the execution of the supplier * @throws NullPointerException if either {@code lock} or {@code supplier} is null */ - public static <T, E extends Exception> T inLockThrows(Lock lock, ThrowingSupplier<T, E> supplier) throws E { + public static <T, E extends Exception> T inLock(Lock lock, ThrowingSupplier<T, E> supplier) throws E { Objects.requireNonNull(lock, "Lock must not be null"); Objects.requireNonNull(supplier, "Supplier must not be null"); @@ -115,7 +70,7 @@ public class LockUtils { * @throws E if an exception occurs during the execution of the runnable * @throws NullPointerException if either {@code lock} or {@code runnable} is null */ - public static <E extends Exception> void inLockThrows(Lock lock, ThrowingRunnable<E> runnable) throws E { + public static <E extends Exception> void inLock(Lock lock, ThrowingRunnable<E> runnable) throws E { Objects.requireNonNull(lock, "Lock must not be null"); Objects.requireNonNull(runnable, "Runnable must not be null"); diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java index eddd4ed8070..12e99b34739 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java @@ -35,7 +35,6 @@ import java.util.Objects; import java.util.OptionalInt; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Supplier; /** * The abstract index class which holds entry format agnostic methods. @@ -48,7 +47,15 @@ public abstract class AbstractIndex implements Closeable { private static final Logger log = LoggerFactory.getLogger(AbstractIndex.class); - // Serializes all index operations that mutate internal state + // Serializes all index operations that mutate internal state. + // Readers do not need to acquire this lock because: + // 1) MappedByteBuffer provides direct access to the OS-level buffer cache, + // which allows concurrent reads in practice. + // 2) Clients only read committed data and are not affected by concurrent appends/truncates. + // In the rare case when the data is truncated, the follower could read inconsistent data. + // The follower has the logic to ignore the inconsistent data through crc and leader epoch. + // 3) Read and remap operations are coordinated via remapLock to ensure visibility of the + // underlying mmap. private final ReentrantLock lock = new ReentrantLock(); // Allows concurrent read operations while ensuring exclusive access if the underlying mmap is changed private final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock(); @@ -191,8 +198,8 @@ public abstract class AbstractIndex implements Closeable { * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not. */ public boolean resize(int newSize) throws IOException { - return inLockThrows(() -> - inRemapWriteLockThrows(() -> { + return inLock(() -> + inRemapWriteLock(() -> { int roundedNewSize = roundDownToExactMultiple(newSize, entrySize()); if (length == roundedNewSize) { @@ -258,7 +265,7 @@ public abstract class AbstractIndex implements Closeable { * the file. */ public void trimToValidSize() throws IOException { - inLockThrows(() -> { + inLock(() -> { if (mmap != null) { resize(entrySize() * entries); } @@ -282,10 +289,7 @@ public abstract class AbstractIndex implements Closeable { // However, in some cases it can pause application threads(STW) for a long moment reading metadata from a physical disk. // To prevent this, we forcefully cleanup memory mapping within proper execution which never affects API responsiveness. // See https://issues.apache.org/jira/browse/KAFKA-4614 for the details. - inLockThrows(() -> - inRemapWriteLockThrows(() -> { - safeForceUnmap(); - })); + inLock(() -> inRemapWriteLock(this::safeForceUnmap)); } /** @@ -412,36 +416,28 @@ public abstract class AbstractIndex implements Closeable { mmap.position(entries * entrySize()); } - protected final <T> T inLock(Supplier<T> action) { + protected final <T, E extends Exception> T inLock(LockUtils.ThrowingSupplier<T, E> action) throws E { return LockUtils.inLock(lock, action); } - protected final void inLock(Runnable action) { + protected final <E extends Exception> void inLock(LockUtils.ThrowingRunnable<E> action) throws E { LockUtils.inLock(lock, action); } - protected final <T, E extends Exception> T inLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E { - return LockUtils.inLockThrows(lock, action); - } - - protected final <E extends Exception> void inLockThrows(LockUtils.ThrowingRunnable<E> action) throws E { - LockUtils.inLockThrows(lock, action); - } - - protected final <T> T inRemapReadLock(Supplier<T> action) { + protected final <T, E extends Exception> T inRemapReadLock(LockUtils.ThrowingSupplier<T, E> action) throws E { return LockUtils.inLock(remapLock.readLock(), action); } - protected final void inRemapReadLock(Runnable action) { + protected final <E extends Exception> void inRemapReadLock(LockUtils.ThrowingRunnable<E> action) throws E { LockUtils.inLock(remapLock.readLock(), action); } - protected final <T, E extends Exception> T inRemapWriteLockThrows(LockUtils.ThrowingSupplier<T, E> action) throws E { - return LockUtils.inLockThrows(remapLock.writeLock(), action); + protected final <T, E extends Exception> T inRemapWriteLock(LockUtils.ThrowingSupplier<T, E> action) throws E { + return LockUtils.inLock(remapLock.writeLock(), action); } - protected final <E extends Exception> void inRemapWriteLockThrows(LockUtils.ThrowingRunnable<E> action) throws E { - LockUtils.inLockThrows(remapLock.writeLock(), action); + protected final <E extends Exception> void inRemapWriteLock(LockUtils.ThrowingRunnable<E> action) throws E { + LockUtils.inLock(remapLock.writeLock(), action); } /** diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java index e6f50da24ee..3043c17cf8a 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java @@ -215,7 +215,7 @@ public class TimeIndex extends AbstractIndex { @Override public boolean resize(int newSize) throws IOException { - return inLockThrows(() -> { + return inLock(() -> { if (super.resize(newSize)) { this.lastEntry = lastEntryFromIndexFile(); return true;