This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dfa5aa5484a KAFKA-18022: fetchOffsetMetadata handling for minBytes
estimation in both common/uncommon cases of share fetch (#17825)
dfa5aa5484a is described below
commit dfa5aa5484a12e18cc763e064a70391103524165
Author: Abhinav Dixit <[email protected]>
AuthorDate: Sat Nov 16 20:56:30 2024 +0530
KAFKA-18022: fetchOffsetMetadata handling for minBytes estimation in both
common/uncommon cases of share fetch (#17825)
Reviewers: Jun Rao <[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 21 ++++----
.../java/kafka/server/share/SharePartition.java | 59 +++++++++++++++-------
.../kafka/server/share/DelayedShareFetchTest.java | 7 +--
.../server/share/SharePartitionManagerTest.java | 7 +--
4 files changed, 60 insertions(+), 34 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index 9f1ad3ef651..476b309a1eb 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -90,7 +90,7 @@ public class DelayedShareFetch extends DelayedOperation {
*/
@Override
public void onComplete() {
- // We are utilizing lock so that onComplete doesn't do a dirty read
for global variables -
+ // We are utilizing lock so that onComplete doesn't do a dirty read
for instance variables -
// partitionsAcquired and partitionsAlreadyFetched, since these
variables can get updated in a different tryComplete thread.
lock.lock();
log.trace("Completing the delayed share fetch request for group {},
member {}, "
@@ -165,7 +165,7 @@ public class DelayedShareFetch extends DelayedOperation {
// replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
- maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
+ maybeUpdateFetchOffsetMetadata(topicPartitionData,
replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData)) {
partitionsAcquired = topicPartitionData;
partitionsAlreadyFetched = replicaManagerReadResponse;
@@ -239,21 +239,22 @@ public class DelayedShareFetch extends DelayedOperation {
}
private LinkedHashMap<TopicIdPartition, LogReadResult>
maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData) {
- LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
- if (sharePartition.fetchOffsetMetadata().isEmpty()) {
- partitionsMissingFetchOffsetMetadata.put(topicIdPartition,
partitionData);
+ if
(sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) {
+ partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition,
partitionData);
}
});
- if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
+ if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) {
return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic
partitions that have missing fetch offset metadata.
- return readFromLog(partitionsMissingFetchOffsetMetadata);
+ return readFromLog(partitionsNotMatchingFetchOffsetMetadata);
}
private void maybeUpdateFetchOffsetMetadata(
+ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry :
replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
@@ -264,7 +265,9 @@ public class DelayedShareFetch extends DelayedOperation {
replicaManagerLogReadResult, topicIdPartition);
continue;
}
-
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
+ sharePartition.updateFetchOffsetMetadata(
+ topicPartitionData.get(topicIdPartition).fetchOffset,
+ replicaManagerLogReadResult.info().fetchOffsetMetadata);
}
}
@@ -290,7 +293,7 @@ public class DelayedShareFetch extends DelayedOperation {
SharePartition sharePartition =
sharePartitions.get(topicIdPartition);
- Optional<LogOffsetMetadata> optionalFetchOffsetMetadata =
sharePartition.fetchOffsetMetadata();
+ Optional<LogOffsetMetadata> optionalFetchOffsetMetadata =
sharePartition.fetchOffsetMetadata(partitionData.fetchOffset);
if (optionalFetchOffsetMetadata.isEmpty() ||
optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
continue;
LogOffsetMetadata fetchOffsetMetadata =
optionalFetchOffsetMetadata.get();
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 632cb1e3169..ddc023a5315 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -280,9 +280,9 @@ public class SharePartition {
private long endOffset;
/**
- * We maintain the latest fetch offset metadata to estimate the minBytes
requirement more efficiently.
+ * We maintain the latest fetch offset and its metadata to estimate the
minBytes requirement more efficiently.
*/
- private Optional<LogOffsetMetadata> fetchOffsetMetadata;
+ private final OffsetMetadata fetchOffsetMetadata;
/**
* The state epoch is used to track the version of the state of the share
partition.
@@ -347,6 +347,7 @@ public class SharePartition {
this.partitionState = sharePartitionState;
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
+ this.fetchOffsetMetadata = new OffsetMetadata();
}
/**
@@ -451,12 +452,12 @@ public class SharePartition {
// If the cachedState is not empty, findNextFetchOffset
flag is set to true so that any AVAILABLE records
// in the cached state are not missed
findNextFetchOffset.set(true);
-
updateEndOffsetAndResetFetchOffsetMetadata(cachedState.lastEntry().getValue().lastOffset());
+ endOffset =
cachedState.lastEntry().getValue().lastOffset();
// In case the persister read state RPC result contains no
AVAILABLE records, we can update cached state
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
} else {
- updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
+ endOffset = startOffset;
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
@@ -943,7 +944,7 @@ public class SharePartition {
// If the cached state is empty, then the start and end offset
will be the new log start offset.
// This can occur during the initialization of share partition
if LSO has moved.
startOffset = logStartOffset;
- updateEndOffsetAndResetFetchOffsetMetadata(logStartOffset);
+ endOffset = logStartOffset;
return;
}
@@ -961,7 +962,7 @@ public class SharePartition {
// This case means that the cached state is completely fresh
now.
// Example scenario - batch of 0-10 in acquired state in
cached state, then LSO moves to 15,
// then endOffset should be 15 as well.
- updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
+ endOffset = startOffset;
}
// Note -
@@ -1192,7 +1193,7 @@ public class SharePartition {
if (cachedState.firstKey() == firstAcquiredOffset) {
startOffset = firstAcquiredOffset;
}
- updateEndOffsetAndResetFetchOffsetMetadata(lastAcquiredOffset);
+ endOffset = lastAcquiredOffset;
return new AcquiredRecords()
.setFirstOffset(firstAcquiredOffset)
.setLastOffset(lastAcquiredOffset)
@@ -1592,27 +1593,21 @@ public class SharePartition {
return Optional.empty();
}
- // The caller of this function is expected to hold lock.writeLock() when
calling this method.
- protected void updateEndOffsetAndResetFetchOffsetMetadata(long
updatedEndOffset) {
- endOffset = updatedEndOffset;
- fetchOffsetMetadata = Optional.empty();
- }
-
- protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata>
fetchOffsetMetadata) {
+ protected void updateFetchOffsetMetadata(long nextFetchOffset,
LogOffsetMetadata logOffsetMetadata) {
lock.writeLock().lock();
try {
- this.fetchOffsetMetadata = fetchOffsetMetadata;
+ fetchOffsetMetadata.updateOffsetMetadata(nextFetchOffset,
logOffsetMetadata);
} finally {
lock.writeLock().unlock();
}
}
- protected Optional<LogOffsetMetadata> fetchOffsetMetadata() {
+ protected Optional<LogOffsetMetadata> fetchOffsetMetadata(long
nextFetchOffset) {
lock.readLock().lock();
try {
- if (findNextFetchOffset.get())
+ if (fetchOffsetMetadata.offsetMetadata() == null ||
fetchOffsetMetadata.offset() != nextFetchOffset)
return Optional.empty();
- return fetchOffsetMetadata;
+ return Optional.of(fetchOffsetMetadata.offsetMetadata());
} finally {
lock.readLock().unlock();
}
@@ -1696,7 +1691,7 @@ public class SharePartition {
long lastCachedOffset =
cachedState.lastEntry().getValue().lastOffset();
if (lastOffsetAcknowledged == lastCachedOffset) {
startOffset = lastCachedOffset + 1; // The next offset that
will be fetched and acquired in the share partition
- updateEndOffsetAndResetFetchOffsetMetadata(lastCachedOffset +
1);
+ endOffset = lastCachedOffset + 1;
cachedState.clear();
// Nothing further to do.
return;
@@ -2426,4 +2421,30 @@ public class SharePartition {
")";
}
}
+
+ /**
+ * FetchOffsetMetadata class is used to cache offset and its log metadata.
+ */
+ static final class OffsetMetadata {
+ // This offset could be different from offsetMetadata.messageOffset if
it's in the middle of a batch.
+ private long offset;
+ private LogOffsetMetadata offsetMetadata;
+
+ OffsetMetadata() {
+ offset = -1;
+ }
+
+ long offset() {
+ return offset;
+ }
+
+ LogOffsetMetadata offsetMetadata() {
+ return offsetMetadata;
+ }
+
+ void updateOffsetMetadata(long offset, LogOffsetMetadata
offsetMetadata) {
+ this.offset = offset;
+ this.offsetMetadata = offsetMetadata;
+ }
+ }
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index 12b89cffd37..6ef67afb105 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -65,6 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -165,7 +166,7 @@ public class DelayedShareFetchTest {
// We are testing the case when the share partition is getting fetched
for the first time, so for the first time
// the fetchOffsetMetadata will return empty. Post the readFromLog
call, the fetchOffsetMetadata will be
// populated for the share partition, which has 1 as the positional
difference, so it doesn't satisfy the minBytes(2).
- when(sp0.fetchOffsetMetadata())
+ when(sp0.fetchOffsetMetadata(anyLong()))
.thenReturn(Optional.empty())
.thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
@@ -223,7 +224,7 @@ public class DelayedShareFetchTest {
// functionality to give the file position difference as 1 byte, so it
doesn't satisfy the minBytes(2).
LogOffsetMetadata hwmOffsetMetadata = mock(LogOffsetMetadata.class);
when(hwmOffsetMetadata.positionDiff(any())).thenReturn(1);
-
when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
+
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
@@ -271,7 +272,7 @@ public class DelayedShareFetchTest {
ShareAcquiredRecords.fromAcquiredRecords(new
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1)));
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());
- when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
+ when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0,
1);
DelayedShareFetch delayedShareFetch =
spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 06bb123f550..eea0a75c56c 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -123,6 +123,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
@@ -1677,7 +1678,7 @@ public class SharePartitionManagerTest {
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
- when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
+ when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp1, 2);
// Initially you cannot acquire records for both sp1 and sp2.
@@ -1877,7 +1878,7 @@ public class SharePartitionManagerTest {
"TestShareFetch", mockTimer,
mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager,
delayedShareFetchPurgatory);
- when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
+ when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager,
tp1, 1);
// Initially you cannot acquire records for both sp1 and sp2.
@@ -2363,7 +2364,7 @@ public class SharePartitionManagerTest {
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager,
delayedShareFetchPurgatory);
- when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
+ when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new
LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1,
1);
doAnswer(invocation ->
buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(),
any(), any(ReplicaQuota.class), anyBoolean());