This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dfa5aa5484a KAFKA-18022: fetchOffsetMetadata handling for minBytes 
estimation in both common/uncommon cases of share fetch (#17825)
dfa5aa5484a is described below

commit dfa5aa5484a12e18cc763e064a70391103524165
Author: Abhinav Dixit <[email protected]>
AuthorDate: Sat Nov 16 20:56:30 2024 +0530

    KAFKA-18022: fetchOffsetMetadata handling for minBytes estimation in both 
common/uncommon cases of share fetch (#17825)
    
    Reviewers: Jun Rao <[email protected]>
---
 .../java/kafka/server/share/DelayedShareFetch.java | 21 ++++----
 .../java/kafka/server/share/SharePartition.java    | 59 +++++++++++++++-------
 .../kafka/server/share/DelayedShareFetchTest.java  |  7 +--
 .../server/share/SharePartitionManagerTest.java    |  7 +--
 4 files changed, 60 insertions(+), 34 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 9f1ad3ef651..476b309a1eb 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -90,7 +90,7 @@ public class DelayedShareFetch extends DelayedOperation {
      */
     @Override
     public void onComplete() {
-        // We are utilizing lock so that onComplete doesn't do a dirty read 
for global variables -
+        // We are utilizing lock so that onComplete doesn't do a dirty read 
for instance variables -
         // partitionsAcquired and partitionsAlreadyFetched, since these 
variables can get updated in a different tryComplete thread.
         lock.lock();
         log.trace("Completing the delayed share fetch request for group {}, 
member {}, "
@@ -165,7 +165,7 @@ public class DelayedShareFetch extends DelayedOperation {
                 // replicaManager.readFromLog to populate the offset metadata 
and update the fetch offset metadata for
                 // those topic partitions.
                 LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
-                maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
+                maybeUpdateFetchOffsetMetadata(topicPartitionData, 
replicaManagerReadResponse);
                 if (anyPartitionHasLogReadError(replicaManagerReadResponse) || 
isMinBytesSatisfied(topicPartitionData)) {
                     partitionsAcquired = topicPartitionData;
                     partitionsAlreadyFetched = replicaManagerReadResponse;
@@ -239,21 +239,22 @@ public class DelayedShareFetch extends DelayedOperation {
     }
 
     private LinkedHashMap<TopicIdPartition, LogReadResult> 
maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData) {
-        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
+        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
         topicPartitionData.forEach((topicIdPartition, partitionData) -> {
             SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
-            if (sharePartition.fetchOffsetMetadata().isEmpty()) {
-                partitionsMissingFetchOffsetMetadata.put(topicIdPartition, 
partitionData);
+            if 
(sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) {
+                partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, 
partitionData);
             }
         });
-        if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
+        if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) {
             return new LinkedHashMap<>();
         }
         // We fetch data from replica manager corresponding to the topic 
partitions that have missing fetch offset metadata.
-        return readFromLog(partitionsMissingFetchOffsetMetadata);
+        return readFromLog(partitionsNotMatchingFetchOffsetMetadata);
     }
 
     private void maybeUpdateFetchOffsetMetadata(
+        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData,
         LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponseData) {
         for (Map.Entry<TopicIdPartition, LogReadResult> entry : 
replicaManagerReadResponseData.entrySet()) {
             TopicIdPartition topicIdPartition = entry.getKey();
@@ -264,7 +265,9 @@ public class DelayedShareFetch extends DelayedOperation {
                     replicaManagerLogReadResult, topicIdPartition);
                 continue;
             }
-            
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+            sharePartition.updateFetchOffsetMetadata(
+                topicPartitionData.get(topicIdPartition).fetchOffset,
+                replicaManagerLogReadResult.info().fetchOffsetMetadata);
         }
     }
 
@@ -290,7 +293,7 @@ public class DelayedShareFetch extends DelayedOperation {
 
             SharePartition sharePartition = 
sharePartitions.get(topicIdPartition);
 
-            Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = 
sharePartition.fetchOffsetMetadata();
+            Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = 
sharePartition.fetchOffsetMetadata(partitionData.fetchOffset);
             if (optionalFetchOffsetMetadata.isEmpty() || 
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
                 continue;
             LogOffsetMetadata fetchOffsetMetadata = 
optionalFetchOffsetMetadata.get();
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 632cb1e3169..ddc023a5315 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -280,9 +280,9 @@ public class SharePartition {
     private long endOffset;
 
     /**
-     * We maintain the latest fetch offset metadata to estimate the minBytes 
requirement more efficiently.
+     * We maintain the latest fetch offset and its metadata to estimate the 
minBytes requirement more efficiently.
      */
-    private Optional<LogOffsetMetadata> fetchOffsetMetadata;
+    private final OffsetMetadata fetchOffsetMetadata;
 
     /**
      * The state epoch is used to track the version of the state of the share 
partition.
@@ -347,6 +347,7 @@ public class SharePartition {
         this.partitionState = sharePartitionState;
         this.replicaManager = replicaManager;
         this.groupConfigManager = groupConfigManager;
+        this.fetchOffsetMetadata = new OffsetMetadata();
     }
 
     /**
@@ -451,12 +452,12 @@ public class SharePartition {
                     // If the cachedState is not empty, findNextFetchOffset 
flag is set to true so that any AVAILABLE records
                     // in the cached state are not missed
                     findNextFetchOffset.set(true);
-                    
updateEndOffsetAndResetFetchOffsetMetadata(cachedState.lastEntry().getValue().lastOffset());
+                    endOffset = 
cachedState.lastEntry().getValue().lastOffset();
                     // In case the persister read state RPC result contains no 
AVAILABLE records, we can update cached state
                     // and start/end offsets.
                     maybeUpdateCachedStateAndOffsets();
                 } else {
-                    updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
+                    endOffset = startOffset;
                 }
                 // Set the partition state to Active and complete the future.
                 partitionState = SharePartitionState.ACTIVE;
@@ -943,7 +944,7 @@ public class SharePartition {
                 // If the cached state is empty, then the start and end offset 
will be the new log start offset.
                 // This can occur during the initialization of share partition 
if LSO has moved.
                 startOffset = logStartOffset;
-                updateEndOffsetAndResetFetchOffsetMetadata(logStartOffset);
+                endOffset = logStartOffset;
                 return;
             }
 
@@ -961,7 +962,7 @@ public class SharePartition {
                 // This case means that the cached state is completely fresh 
now.
                 // Example scenario - batch of 0-10 in acquired state in 
cached state, then LSO moves to 15,
                 // then endOffset should be 15 as well.
-                updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
+                endOffset = startOffset;
             }
 
             // Note -
@@ -1192,7 +1193,7 @@ public class SharePartition {
             if (cachedState.firstKey() == firstAcquiredOffset)  {
                 startOffset = firstAcquiredOffset;
             }
-            updateEndOffsetAndResetFetchOffsetMetadata(lastAcquiredOffset);
+            endOffset = lastAcquiredOffset;
             return new AcquiredRecords()
                 .setFirstOffset(firstAcquiredOffset)
                 .setLastOffset(lastAcquiredOffset)
@@ -1592,27 +1593,21 @@ public class SharePartition {
         return Optional.empty();
     }
 
-    // The caller of this function is expected to hold lock.writeLock() when 
calling this method.
-    protected void updateEndOffsetAndResetFetchOffsetMetadata(long 
updatedEndOffset) {
-        endOffset = updatedEndOffset;
-        fetchOffsetMetadata = Optional.empty();
-    }
-
-    protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata> 
fetchOffsetMetadata) {
+    protected void updateFetchOffsetMetadata(long nextFetchOffset, 
LogOffsetMetadata logOffsetMetadata) {
         lock.writeLock().lock();
         try {
-            this.fetchOffsetMetadata = fetchOffsetMetadata;
+            fetchOffsetMetadata.updateOffsetMetadata(nextFetchOffset, 
logOffsetMetadata);
         } finally {
             lock.writeLock().unlock();
         }
     }
 
-    protected Optional<LogOffsetMetadata> fetchOffsetMetadata() {
+    protected Optional<LogOffsetMetadata> fetchOffsetMetadata(long 
nextFetchOffset) {
         lock.readLock().lock();
         try {
-            if (findNextFetchOffset.get())
+            if (fetchOffsetMetadata.offsetMetadata() == null || 
fetchOffsetMetadata.offset() != nextFetchOffset)
                 return Optional.empty();
-            return fetchOffsetMetadata;
+            return Optional.of(fetchOffsetMetadata.offsetMetadata());
         } finally {
             lock.readLock().unlock();
         }
@@ -1696,7 +1691,7 @@ public class SharePartition {
             long lastCachedOffset = 
cachedState.lastEntry().getValue().lastOffset();
             if (lastOffsetAcknowledged == lastCachedOffset) {
                 startOffset = lastCachedOffset + 1; // The next offset that 
will be fetched and acquired in the share partition
-                updateEndOffsetAndResetFetchOffsetMetadata(lastCachedOffset + 
1);
+                endOffset = lastCachedOffset + 1;
                 cachedState.clear();
                 // Nothing further to do.
                 return;
@@ -2426,4 +2421,30 @@ public class SharePartition {
                 ")";
         }
     }
+
+    /**
+     * FetchOffsetMetadata class is used to cache offset and its log metadata.
+     */
+    static final class OffsetMetadata {
+        // This offset could be different from offsetMetadata.messageOffset if 
it's in the middle of a batch.
+        private long offset;
+        private LogOffsetMetadata offsetMetadata;
+
+        OffsetMetadata() {
+            offset = -1;
+        }
+
+        long offset() {
+            return offset;
+        }
+
+        LogOffsetMetadata offsetMetadata() {
+            return offsetMetadata;
+        }
+
+        void updateOffsetMetadata(long offset, LogOffsetMetadata 
offsetMetadata) {
+            this.offset = offset;
+            this.offsetMetadata = offsetMetadata;
+        }
+    }
 }
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 12b89cffd37..6ef67afb105 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -65,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -165,7 +166,7 @@ public class DelayedShareFetchTest {
         // We are testing the case when the share partition is getting fetched 
for the first time, so for the first time
         // the fetchOffsetMetadata will return empty. Post the readFromLog 
call, the fetchOffsetMetadata will be
         // populated for the share partition, which has 1 as the positional 
difference, so it doesn't satisfy the minBytes(2).
-        when(sp0.fetchOffsetMetadata())
+        when(sp0.fetchOffsetMetadata(anyLong()))
             .thenReturn(Optional.empty())
             .thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
         LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
@@ -223,7 +224,7 @@ public class DelayedShareFetchTest {
         // functionality to give the file position difference as 1 byte, so it 
doesn't satisfy the minBytes(2).
         LogOffsetMetadata hwmOffsetMetadata = mock(LogOffsetMetadata.class);
         when(hwmOffsetMetadata.positionDiff(any())).thenReturn(1);
-        
when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
+        
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
         mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
 
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
@@ -271,7 +272,7 @@ public class DelayedShareFetchTest {
             ShareAcquiredRecords.fromAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());
 
-        when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 
1);
         DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
             .withShareFetchData(shareFetch)
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 06bb123f550..eea0a75c56c 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -123,6 +123,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.atMost;
@@ -1677,7 +1678,7 @@ public class SharePartitionManagerTest {
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
                 DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
-        when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp1, 2);
 
         // Initially you cannot acquire records for both sp1 and sp2.
@@ -1877,7 +1878,7 @@ public class SharePartitionManagerTest {
                 "TestShareFetch", mockTimer, 
mockReplicaManager.localBrokerId(),
                 DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
         mockReplicaManagerDelayedShareFetch(mockReplicaManager, 
delayedShareFetchPurgatory);
-        when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, 
tp1, 1);
 
         // Initially you cannot acquire records for both sp1 and sp2.
@@ -2363,7 +2364,7 @@ public class SharePartitionManagerTest {
             "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
             DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
         mockReplicaManagerDelayedShareFetch(replicaManager, 
delayedShareFetchPurgatory);
-        when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
+        when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new 
LogOffsetMetadata(0, 1, 0)));
         mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 
1);
 
         doAnswer(invocation -> 
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
 any(), any(ReplicaQuota.class), anyBoolean());

Reply via email to