This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 31e1a57c41c KAFKA-18989 Optimize FileRecord#searchForOffsetWithSize 
(#19214)
31e1a57c41c is described below

commit 31e1a57c41cf9cb600751669dc71bcd9596b45f9
Author: Ken Huang <[email protected]>
AuthorDate: Thu Mar 20 16:33:35 2025 +0800

    KAFKA-18989 Optimize FileRecord#searchForOffsetWithSize (#19214)
    
    The `lastOffset` includes the entire batch header, so we should check 
`baseOffset` instead.
    
    To optimize this, we need to update the search logic. The previous
    approach simply checked whether each batch's `lastOffset()` was greater
    than or equal to the target offset. Once it found the first batch that
    met this condition, it returned that batch immediately.
    
    Now that we are using `baseOffset()`, we need to handle a special case:
    if the `targetOffset` falls between the `lastOffset` of the previous
    batch and the `baseOffset` of the matching batch, we should select the
    matching batch. The updated logic is structured as follows:
    
    1. First, if baseOffset exactly equals targetOffset, return immediately.
    2. If we find the first batch with baseOffset greater than targetOffset
        - Check if the previous batch contains the target
    - If there's no previous batch, return the current batch or the previous
    batch doesn't contain the target, return the current batch
    5. After iterating through all batches, check if the last batch contains
    the target offset.
    
    This code path is not thread-safe, so we need to prevent `EOFException`.
    To avoid this exception, I am still using an early return. In this
    scenario, `lastOffset` is still used within the loop, but it should be
    executed at most once within the loop.
    
    Therefore, in the new implementation, `lastOffset` will be executed at
    most once. In most cases, this results in an optimization.
    
    Test: Verifying Memory Usage Improvement
    To evaluate whether this optimization helps, I followed the steps below
    to monitor memory usage:
    
    1. Start a Standalone Kafka Server
    ```sh
    KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
    bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c 
config/server.properties
    bin/kafka-server-start.sh config/server.properties
    ```
    
    2. Use Performance Console Tools to Produce and Consume Records
    
    **Produce Records:**
    ```sh
    ./kafka-producer-perf-test.sh \
      --topic test-topic \
      --num-records 1000000000 \
      --record-size 100 \
      --throughput -1 \
      --producer-props bootstrap.servers=localhost:9092
    ```
    **Consume Records:**
    ```sh
    ./bin/kafka-consumer-perf-test.sh \
      --topic test-topic \
      --messages 1000000000 \
      --bootstrap-server localhost:9092
    ```
    It can be observed that memory usage has significantly decreased.
    trunk:
    ![CleanShot 2025-03-16 at 11 53
    
31@2x](https://github.com/user-attachments/assets/eec26b1d-38ed-41c8-8c49-e5c68643761b)
    this PR:
    ![CleanShot 2025-03-16 at 17 41
    
56@2x](https://github.com/user-attachments/assets/c8d4c234-18c2-4642-88ae-9f96cf54fccc)
    
    Reviewers: Kirk True <[email protected]>, TengYao Chi
    <[email protected]>, David Arthur <[email protected]>, Jun Rao
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/common/record/FileRecords.java    |  38 +++-
 .../kafka/common/record/FileRecordsTest.java       | 207 ++++++++++++++++++++-
 .../kafka/storage/internals/log/LogSegment.java    |   2 +-
 .../storage/internals/log/LogSegmentTest.java      |   2 +-
 4 files changed, 236 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index c905a4678a9..f70ee032346 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -292,17 +292,43 @@ public class FileRecords extends AbstractRecords 
implements Closeable {
     /**
      * Search forward for the file position of the message batch whose last 
offset that is greater
      * than or equal to the target offset. If no such batch is found, return 
null.
+     * <p>
+     * The following logic is intentionally designed to minimize memory usage
+     * by avoiding unnecessary calls to {@link 
FileChannelRecordBatch#lastOffset()} for every batch.
+     * Instead, we use {@link FileChannelRecordBatch#baseOffset()} comparisons 
when possible, and only 
+     * check {@link FileChannelRecordBatch#lastOffset()} when absolutely 
necessary.
      *
      * @param targetOffset The offset to search for.
      * @param startingPosition The starting position in the file to begin 
searching from.
      * @return the batch's base offset, its physical position, and its size 
(including log overhead)
      */
-    public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int 
startingPosition) {
+    public LogOffsetPosition searchForOffsetFromPosition(long targetOffset, 
int startingPosition) {
+        FileChannelRecordBatch prevBatch = null;
+
         for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
-            long offset = batch.lastOffset();
-            if (offset >= targetOffset)
-                return new LogOffsetPosition(batch.baseOffset(), 
batch.position(), batch.sizeInBytes());
+            // If baseOffset exactly equals targetOffset, return immediately
+            if (batch.baseOffset() == targetOffset) {
+                return LogOffsetPosition.fromBatch(batch);
+            }
+            
+            // If we find the first batch with baseOffset greater than 
targetOffset
+            if (batch.baseOffset() > targetOffset) {
+                // If the previous batch contains the target
+                if (prevBatch != null && prevBatch.lastOffset() >= 
targetOffset)
+                    return LogOffsetPosition.fromBatch(prevBatch);
+                else {
+                    // If there's no previous batch or the previous batch 
doesn't contain the 
+                    // target, return the current batch
+                    return LogOffsetPosition.fromBatch(batch);
+                }
+            }
+            prevBatch = batch;
         }
+        // Only one case would reach here: all batches have baseOffset less 
than targetOffset
+        // Check if the last batch contains the target
+        if (prevBatch != null && prevBatch.lastOffset() >= targetOffset) 
+            return LogOffsetPosition.fromBatch(prevBatch);
+
         return null;
     }
 
@@ -463,6 +489,10 @@ public class FileRecords extends AbstractRecords 
implements Closeable {
         public final long offset;
         public final int position;
         public final int size;
+        
+        public static LogOffsetPosition fromBatch(FileChannelRecordBatch 
batch) {
+            return new LogOffsetPosition(batch.baseOffset(), batch.position(), 
batch.sizeInBytes());
+        }
 
         public LogOffsetPosition(long offset, int position, int size) {
             this.offset = offset;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 7574609ab99..a2e89d3f4c6 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 
 import java.io.File;
@@ -51,10 +53,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -248,25 +253,25 @@ public class FileRecordsTest {
 
         int message1Size = batches.get(0).sizeInBytes();
         assertEquals(new FileRecords.LogOffsetPosition(0L, position, 
message1Size),
-            fileRecords.searchForOffsetWithSize(0, 0),
+            fileRecords.searchForOffsetFromPosition(0, 0),
             "Should be able to find the first message by its offset");
         position += message1Size;
 
         int message2Size = batches.get(1).sizeInBytes();
         assertEquals(new FileRecords.LogOffsetPosition(1L, position, 
message2Size),
-            fileRecords.searchForOffsetWithSize(1, 0),
+            fileRecords.searchForOffsetFromPosition(1, 0),
             "Should be able to find second message when starting from 0");
         assertEquals(new FileRecords.LogOffsetPosition(1L, position, 
message2Size),
-            fileRecords.searchForOffsetWithSize(1, position),
+            fileRecords.searchForOffsetFromPosition(1, position),
             "Should be able to find second message starting from its offset");
         position += message2Size + batches.get(2).sizeInBytes();
 
         int message4Size = batches.get(3).sizeInBytes();
         assertEquals(new FileRecords.LogOffsetPosition(50L, position, 
message4Size),
-            fileRecords.searchForOffsetWithSize(3, position),
+            fileRecords.searchForOffsetFromPosition(3, position),
             "Should be able to find fourth message from a non-existent 
offset");
         assertEquals(new FileRecords.LogOffsetPosition(50L, position, 
message4Size),
-            fileRecords.searchForOffsetWithSize(50,  position),
+            fileRecords.searchForOffsetFromPosition(50,  position),
             "Should be able to find fourth message by correct offset");
     }
 
@@ -276,7 +281,7 @@ public class FileRecordsTest {
     @Test
     public void testIteratorWithLimits() throws IOException {
         RecordBatch batch = batches(fileRecords).get(1);
-        int start = fileRecords.searchForOffsetWithSize(1, 0).position;
+        int start = fileRecords.searchForOffsetFromPosition(1, 0).position;
         int size = batch.sizeInBytes();
         FileRecords slice = fileRecords.slice(start, size);
         assertEquals(Collections.singletonList(batch), batches(slice));
@@ -290,7 +295,7 @@ public class FileRecordsTest {
     @Test
     public void testTruncate() throws IOException {
         RecordBatch batch = batches(fileRecords).get(0);
-        int end = fileRecords.searchForOffsetWithSize(1, 0).position;
+        int end = fileRecords.searchForOffsetFromPosition(1, 0).position;
         fileRecords.truncateTo(end);
         assertEquals(Collections.singletonList(batch), batches(fileRecords));
         assertEquals(batch.sizeInBytes(), fileRecords.sizeInBytes());
@@ -518,6 +523,194 @@ public class FileRecordsTest {
         verify(channel).transferFrom(any(), anyLong(), eq((long) size - 
firstWritten));
     }
 
+    /**
+     * Test two conditions:
+     * 1. If the target offset equals the base offset of the first batch
+     * 2. If the target offset is less than the base offset of the first batch
+     * <p>
+     * If the base offset of the first batch is equal to or greater than the 
target offset, it should return the 
+     * position of the first batch and the lastOffset method should not be 
called.
+     */
+    @ParameterizedTest
+    @ValueSource(longs = {5, 10})
+    public void testSearchForOffsetFromPosition1(long baseOffset) throws 
IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch.baseOffset()).thenReturn(baseOffset);
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(5L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+        verify(batch, never()).lastOffset();
+    }
+
+    /**
+     * Test the case when the target offset equals the last offset of the 
first batch.
+     */
+    @Test
+    public void testSearchForOffsetFromPosition2() throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch.baseOffset()).thenReturn(3L);
+        when(batch.lastOffset()).thenReturn(5L);
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(5L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+        // target is equal to the last offset of the batch, we should call 
lastOffset
+        verify(batch, times(1)).lastOffset();
+    }
+
+    /**
+     * Test the case when the target offset equals the last offset of the last 
batch.
+     */
+    @Test
+    public void testSearchForOffsetFromPosition3() throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch prevBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(prevBatch.baseOffset()).thenReturn(5L);
+        when(prevBatch.lastOffset()).thenReturn(12L);
+        FileLogInputStream.FileChannelRecordBatch currentBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(currentBatch.baseOffset()).thenReturn(15L);
+        when(currentBatch.lastOffset()).thenReturn(20L);
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(20L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(currentBatch), 
result);
+        // Because the target offset is in the current batch, we should not 
call lastOffset in the previous batch
+        verify(prevBatch, never()).lastOffset();
+        verify(currentBatch, times(1)).lastOffset();
+    }
+
+    /**
+     * Test the case when the target offset is within the range of the 
previous batch.
+     */
+    @Test
+    public void testSearchForOffsetFromPosition4() throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch prevBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(prevBatch.baseOffset()).thenReturn(5L);
+        when(prevBatch.lastOffset()).thenReturn(12L); // > targetOffset
+        FileLogInputStream.FileChannelRecordBatch currentBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(10L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), 
result);
+        // Because the target offset is in the current batch, we should call 
lastOffset 
+        // on the previous batch
+        verify(prevBatch, times(1)).lastOffset();
+    }
+
+    /**
+     * Test the case when no batch matches the target offset.
+     */
+    @Test
+    public void testSearchForOffsetFromPosition5() throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch1 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch1.baseOffset()).thenReturn(5L);  // < targetOffset
+        FileLogInputStream.FileChannelRecordBatch batch2 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch2.baseOffset()).thenReturn(8L);  // < targetOffset
+        when(batch2.lastOffset()).thenReturn(9L);  // < targetOffset
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch1, batch2);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(10L, 0);
+
+        assertNull(result);
+        // Because the target offset is exceeded by the last offset of the 
batch2, 
+        // we should call lastOffset on the batch2
+        verify(batch1, never()).lastOffset();
+        verify(batch2, times(1)).lastOffset();
+    }
+
+    /**
+     * Test two conditions:
+     * 1. If the target offset is less than the base offset of the last batch
+     * 2. If the target offset equals the base offset of the last batch
+     */
+    @ParameterizedTest
+    @ValueSource(longs = {8, 10})
+    public void testSearchForOffsetFromPosition6(long baseOffset) throws 
IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch1 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch1.baseOffset()).thenReturn(5L);  // < targetOffset
+        FileLogInputStream.FileChannelRecordBatch batch2 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch2.baseOffset()).thenReturn(baseOffset);  // < targetOffset 
or == targetOffset
+        when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch1, batch2);
+
+        long targetOffset = 10L;
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(targetOffset, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result);
+        if (targetOffset == baseOffset) {
+            // Because the target offset is equal to the base offset of the 
batch2, we should not call
+            // lastOffset on batch2 and batch1
+            verify(batch1, never()).lastOffset();
+            verify(batch2, never()).lastOffset();
+        } else {
+            // Because the target offset is in the batch2, we should not call 
+            // lastOffset on batch1
+            verify(batch1, never()).lastOffset();
+            verify(batch2, times(1)).lastOffset();
+        }
+    }
+
+    /**
+     * Test the case when the target offset is between two batches.
+     */
+    @Test
+    public void testSearchForOffsetFromPosition7() throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch1 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch1.baseOffset()).thenReturn(5L);  
+        when(batch1.lastOffset()).thenReturn(10L); 
+        FileLogInputStream.FileChannelRecordBatch batch2 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch2.baseOffset()).thenReturn(15L);  
+        when(batch2.lastOffset()).thenReturn(20L);  
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch1, batch2);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetFromPosition(13L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result);
+        // Because the target offset is between the two batches, we should 
call lastOffset on the batch1
+        verify(batch1, times(1)).lastOffset();
+        verify(batch2, never()).lastOffset();
+    }
+
+    private void mockFileRecordBatches(FileRecords fileRecords, 
FileLogInputStream.FileChannelRecordBatch... batch) {
+        List<FileLogInputStream.FileChannelRecordBatch> batches = 
asList(batch);
+        doReturn((Iterable<FileLogInputStream.FileChannelRecordBatch>) 
batches::iterator)
+                .when(fileRecords)
+                .batchesFrom(anyInt());
+    }
+
     private void doTestConversion(Compression compression, byte toMagic) 
throws IOException {
         List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 
24L);
 
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
index 2710b59d2b6..6f11b796e35 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
@@ -381,7 +381,7 @@ public class LogSegment implements Closeable {
      */
     LogOffsetPosition translateOffset(long offset, int startingFilePosition) 
throws IOException {
         OffsetPosition mapping = offsetIndex().lookup(offset);
-        return log.searchForOffsetWithSize(offset, Math.max(mapping.position, 
startingFilePosition));
+        return log.searchForOffsetFromPosition(offset, 
Math.max(mapping.position, startingFilePosition));
     }
 
     /**
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
index 9baa9ff394e..9ad98cb577e 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java
@@ -600,7 +600,7 @@ public class LogSegmentTest {
                 int offsetToBeginCorruption = 
TestUtils.RANDOM.nextInt(messagesAppended);
                 // start corrupting somewhere in the middle of the chosen 
record all the way to the end
 
-                FileRecords.LogOffsetPosition recordPosition = 
seg.log().searchForOffsetWithSize(offsetToBeginCorruption, 0);
+                FileRecords.LogOffsetPosition recordPosition = 
seg.log().searchForOffsetFromPosition(offsetToBeginCorruption, 0);
                 int position = recordPosition.position + 
TestUtils.RANDOM.nextInt(15);
                 writeNonsenseToFile(seg.log().file(), position, (int) 
(seg.log().file().length() - position));
                 seg.recover(newProducerStateManager(), 
mock(LeaderEpochFileCache.class));

Reply via email to