This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.17 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit ff7ea0d7efd69fd2988cfb5c6891e1e28e903f61 Author: Hang Chen <[email protected]> AuthorDate: Thu Mar 19 23:37:54 2026 -0700 Fix read thread blocking in sendResponseAndWait causing READ_ENTRY_REQUEST p99 latency spike (#4730) * Fix read thread blocking in sendResponseAndWait causing READ_ENTRY_REQUEST p99 latency spike * address comments (cherry picked from commit 8664dd962935dc23d40218f1bf228661776580fd) --- .../bookkeeper/conf/ServerConfiguration.java | 8 +- .../bookkeeper/proto/PacketProcessorBase.java | 51 ++++-- .../bookkeeper/proto/ReadEntryProcessorTest.java | 178 +++++++++++++++++++++ 3 files changed, 220 insertions(+), 17 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index 4d9eabb241..4bf0ded738 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -1075,10 +1075,16 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati /** * Get max number of reads in progress. 0 == unlimited. * + * <p>This limit bounds the memory used by read responses that have been read from storage + * but not yet flushed to the network. Since read response writes are non-blocking, + * without this limit a slow consumer could cause unbounded memory growth. + * The default value of 10000 provides a reasonable balance between throughput and memory usage. + * Tune based on your average entry size: memoryBudget / avgEntrySize. + * * @return Max number of reads in progress. */ public int getMaxReadsInProgressLimit() { - return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 0); + return this.getInt(MAX_READS_IN_PROGRESS_LIMIT, 10000); } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index 889d3790d2..5ecda0355c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -19,8 +19,8 @@ package org.apache.bookkeeper.proto; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -74,10 +74,12 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable { protected void sendReadReqResponse(int rc, Object response, OpStatsLogger statsLogger, boolean throttle) { if (throttle) { sendResponseAndWait(rc, response, statsLogger); + // onReadRequestFinish is called asynchronously in the ChannelFutureListener + // inside sendResponseAndWait to maintain throttling without blocking the thread. } else { sendResponse(rc, response, statsLogger); + requestProcessor.onReadRequestFinish(); } - requestProcessor.onReadRequestFinish(); } protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) { @@ -150,27 +152,44 @@ abstract class PacketProcessorBase<T extends Request> implements Runnable { } /** - * Write on the channel and wait until the write is completed. + * Write on the channel and notify completion via a listener. * - * <p>That will make the thread to get blocked until we're able to - * write everything on the TCP stack, providing auto-throttling - * and avoiding using too much memory when handling read-requests. + * <p>This provides auto-throttling by holding the read semaphore until the write completes, + * without blocking the read thread pool thread. The read thread is freed immediately to + * process other requests, while the semaphore prevents unbounded read concurrency. */ protected void sendResponseAndWait(int rc, Object response, OpStatsLogger statsLogger) { + // Capture fields before the processor may be recycled after this method returns. + final long capturedEnqueueNanos = this.enqueueNanos; + final BookieRequestProcessor processor = this.requestProcessor; try { Channel channel = requestHandler.ctx().channel(); ChannelFuture future = channel.writeAndFlush(response); - if (!channel.eventLoop().inEventLoop()) { - future.get(); + future.addListener((ChannelFutureListener) f -> { + if (!f.isSuccess() && logger.isDebugEnabled()) { + logger.debug("Netty channel write exception. ", f.cause()); + } + if (BookieProtocol.EOK == rc) { + statsLogger.registerSuccessfulEvent( + MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS); + } else { + statsLogger.registerFailedEvent( + MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS); + } + processor.onReadRequestFinish(); + }); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Netty channel write exception. ", e); } - } catch (ExecutionException | InterruptedException e) { - logger.debug("Netty channel write exception. ", e); - return; - } - if (BookieProtocol.EOK == rc) { - statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); - } else { - statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); + if (BookieProtocol.EOK == rc) { + statsLogger.registerSuccessfulEvent( + MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS); + } else { + statsLogger.registerFailedEvent( + MathUtils.elapsedNanos(capturedEnqueueNanos), TimeUnit.NANOSECONDS); + } + processor.onReadRequestFinish(); } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java index 251f900c09..2cdf744f24 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java @@ -19,11 +19,13 @@ package org.apache.bookkeeper.proto; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -38,10 +40,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookieProtocol.ReadRequest; import org.apache.bookkeeper.proto.BookieProtocol.Response; import org.apache.bookkeeper.stats.NullStatsLogger; @@ -195,4 +199,178 @@ public class ReadEntryProcessorTest { assertEquals(BookieProtocol.READENTRY, response.getOpCode()); assertEquals(BookieProtocol.EOK, response.getErrorCode()); } + + /** + * Test that when throttleReadResponses=true and the caller is not in the Netty event loop, + * the read thread is not blocked by the write. onReadRequestFinish() should only be called + * after the write future completes, preserving throttling without blocking the thread. + */ + @Test + public void testThrottledReadNonBlockingOnSuccess() throws Exception { + // Setup event loop to simulate read worker thread (not event loop thread) + EventLoop eventLoop = mock(EventLoop.class); + when(eventLoop.inEventLoop()).thenReturn(false); + doAnswer(inv -> { + ((Runnable) inv.getArgument(0)).run(); + return null; + }).when(eventLoop).execute(any(Runnable.class)); + when(channel.eventLoop()).thenReturn(eventLoop); + + // Use a controllable promise so we can verify deferred behavior + DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel); + doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class)); + + long ledgerId = System.currentTimeMillis(); + ReadRequest request = ReadRequest.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{}); + ReadEntryProcessor processor = ReadEntryProcessor.create( + request, requestHandler, requestProcessor, null, true /* throttle */); + + // run() should return immediately without blocking on the write + processor.run(); + + // Write should have been issued + verify(channel, times(1)).writeAndFlush(any(Response.class)); + // But onReadRequestFinish should NOT have been called yet — write not completed + verify(requestProcessor, never()).onReadRequestFinish(); + + // Complete the write + writeFuture.setSuccess(); + + // Now onReadRequestFinish should have been called + verify(requestProcessor, times(1)).onReadRequestFinish(); + } + + /** + * Test that onReadRequestFinish() is still called even when the write fails, + * so the read semaphore is always released. + */ + @Test + public void testThrottledReadNonBlockingOnWriteFailure() throws Exception { + EventLoop eventLoop = mock(EventLoop.class); + when(eventLoop.inEventLoop()).thenReturn(false); + doAnswer(inv -> { + ((Runnable) inv.getArgument(0)).run(); + return null; + }).when(eventLoop).execute(any(Runnable.class)); + when(channel.eventLoop()).thenReturn(eventLoop); + + DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel); + doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class)); + + long ledgerId = System.currentTimeMillis(); + ReadRequest request = ReadRequest.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{}); + ReadEntryProcessor processor = ReadEntryProcessor.create( + request, requestHandler, requestProcessor, null, true /* throttle */); + + processor.run(); + + verify(channel, times(1)).writeAndFlush(any(Response.class)); + verify(requestProcessor, never()).onReadRequestFinish(); + + // Fail the write + writeFuture.setFailure(new IOException("channel write failed")); + + // onReadRequestFinish must still be called to release the read semaphore + verify(requestProcessor, times(1)).onReadRequestFinish(); + } + + /** + * Test that when throttleReadResponses=false, onReadRequestFinish() is called + * synchronously before run() returns. + */ + @Test + public void testNonThrottledReadCallsOnFinishSynchronously() throws Exception { + // sendResponse (non-throttle path) uses channel.isActive() and two-arg writeAndFlush + when(channel.isActive()).thenReturn(true); + when(channel.writeAndFlush(any(), any(ChannelPromise.class))).thenReturn(mock(ChannelPromise.class)); + + long ledgerId = System.currentTimeMillis(); + ReadRequest request = ReadRequest.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{}); + ReadEntryProcessor processor = ReadEntryProcessor.create( + request, requestHandler, requestProcessor, null, false /* no throttle */); + + processor.run(); + + verify(channel, times(1)).writeAndFlush(any(), any(ChannelPromise.class)); + // onReadRequestFinish should have been called synchronously + verify(requestProcessor, times(1)).onReadRequestFinish(); + } + + /** + * Verify that maxReadsInProgressLimit defaults to 10000 (enabled), + * ensuring non-blocking read response writes are bounded by default. + */ + @Test + public void testDefaultMaxReadsInProgressLimitIsEnabled() { + ServerConfiguration conf = new ServerConfiguration(); + assertEquals("maxReadsInProgressLimit should default to 10000", + 10000, conf.getMaxReadsInProgressLimit()); + } + + /** + * Test that the read semaphore is held from request creation until the write future completes, + * not released when the read thread returns. This ensures that maxReadsInProgressLimit correctly + * bounds the number of read responses buffered in memory, even though the read thread is + * non-blocking. + */ + @Test + public void testThrottledReadHoldsSemaphoreUntilWriteCompletes() throws Exception { + // Simulate maxReadsInProgressLimit=1 with a real semaphore + Semaphore readsSemaphore = new Semaphore(1); + + doAnswer(inv -> { + readsSemaphore.acquireUninterruptibly(); + return null; + }).when(requestProcessor).onReadRequestStart(any(Channel.class)); + doAnswer(inv -> { + readsSemaphore.release(); + return null; + }).when(requestProcessor).onReadRequestFinish(); + + // Setup non-event-loop thread + EventLoop eventLoop = mock(EventLoop.class); + when(eventLoop.inEventLoop()).thenReturn(false); + doAnswer(inv -> { + ((Runnable) inv.getArgument(0)).run(); + return null; + }).when(eventLoop).execute(any(Runnable.class)); + when(channel.eventLoop()).thenReturn(eventLoop); + + // Controllable write future + DefaultChannelPromise writeFuture = new DefaultChannelPromise(channel); + doAnswer(inv -> writeFuture).when(channel).writeAndFlush(any(Response.class)); + + long ledgerId = System.currentTimeMillis(); + ReadRequest request = ReadRequest.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, 1, (short) 0, new byte[]{}); + + // create() calls onReadRequestStart → semaphore acquired + ReadEntryProcessor processor = ReadEntryProcessor.create( + request, requestHandler, requestProcessor, null, true /* throttle */); + + // Semaphore should be acquired (1 permit used) + assertEquals("semaphore should have 0 permits after read started", + 0, readsSemaphore.availablePermits()); + + // Run the processor — thread returns immediately (non-blocking) + processor.run(); + + // Semaphore should STILL be held (write not completed) + assertEquals("semaphore should still have 0 permits while write is in progress", + 0, readsSemaphore.availablePermits()); + + // A second read would be unable to acquire the semaphore + assertFalse("second read should not be able to acquire semaphore", + readsSemaphore.tryAcquire()); + + // Complete the write + writeFuture.setSuccess(); + + // Now semaphore should be released — a new read can proceed + assertEquals("semaphore should have 1 permit after write completes", + 1, readsSemaphore.availablePermits()); + } }
