This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 6ddcfab5782 KAFKA-19389: Fix memory consumption for completed share
fetch requests (#19928)
6ddcfab5782 is described below
commit 6ddcfab578245268239767295b416b0ce8a0a358
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Jun 10 17:36:27 2025 +0100
KAFKA-19389: Fix memory consumption for completed share fetch requests
(#19928)
For ShareFetch Requests, the fetch happens through DelayedShareFetch
operation. The operations which are already completed has reference to
data being sent as response. As the operation is watched over multiple
keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
hence if the operation is already completed by either watched keys but
then again the reference to the operation is still present in other
watched key. Which means the memory can only be free once purge
operation is triggered by DelayedOperationPurgatory which removes the
watched key operation from remaining keys, as the operation is already
completed.
The purge operation is dependent on the config
`ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG`
hence if the value is not smaller than the number of share fetch
requests which can consume complete memory of the broker then broker can
go out of memory. This can also be avoided by having lower fetch max
bytes for request but this value is client dependent hence can't rely to
prevent the broker.
This PR triggers the completion on both watched keys hence the
DelayedShareFetch operation shall be removed from both keys which frees
the broker memory as soon the share fetch response is sent.
#### Testing
Tested with LocalTieredStorage where broker goes OOM after reading some
8040 messages before the fix, with default configurations as mentioned
in the
doc
[here](https://kafka.apache.org/documentation/#tiered_storage_config_ex).
But after the fix the consumption continues without any issue. And the
memory is released instantaneously.
Reviewers: Jun Rao <[email protected]>, Andrew Schofield
<[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 22 ++++++++++++++++------
.../share/fetch/DelayedShareFetchPartitionKey.java | 5 +++++
2 files changed, 21 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index fce50613bec..2d01fb959cb 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -34,6 +34,7 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
+import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
@@ -804,13 +805,22 @@ public class DelayedShareFetch extends DelayedOperation {
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(topicIdPartitions);
- // If we have a fetch request completed for a topic-partition, we
release the locks for that partition,
- // then we should check if there is a pending share fetch request for
the topic-partition and complete it.
- // We add the action to delayed actions queue to avoid an infinite
call stack, which could happen if
- // we directly call delayedShareFetchPurgatory.checkAndComplete
- replicaManager.addToActionQueue(() ->
topicIdPartitions.forEach(topicIdPartition ->
+ replicaManager.addToActionQueue(() ->
topicIdPartitions.forEach(topicIdPartition -> {
+ // If we have a fetch request completed for a share-partition, we
release the locks for that partition,
+ // then we should check if there is a pending share fetch request
for the share-partition and complete it.
+ // We add the action to delayed actions queue to avoid an infinite
call stack, which could happen if
+ // we directly call delayedShareFetchPurgatory.checkAndComplete.
replicaManager.completeDelayedShareFetchRequest(
- new DelayedShareFetchGroupKey(shareFetch.groupId(),
topicIdPartition.topicId(), topicIdPartition.partition()))));
+ new DelayedShareFetchGroupKey(shareFetch.groupId(),
topicIdPartition.topicId(), topicIdPartition.partition()));
+ // As DelayedShareFetch operation is watched over multiple keys,
same operation might be
+ // completed and can contain references to data fetched. Hence, if
the operation is not
+ // removed from other watched keys then there can be a memory
leak. The removal of the
+ // operation is dependent on the purge task by
DelayedOperationPurgatory. Hence, this can
+ // also be prevented by setting smaller value for configuration
{@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}.
+ // However, it's best to trigger the check on all the keys that
are being watched which
+ // should free the memory for the completed operation.
+ replicaManager.completeDelayedShareFetchRequest(new
DelayedShareFetchPartitionKey(topicIdPartition));
+ }));
}
/**
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
index 584613cde17..c1e40975c2b 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.share.fetch;
+import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
@@ -27,6 +28,10 @@ public class DelayedShareFetchPartitionKey implements
DelayedShareFetchKey {
private final Uuid topicId;
private final int partition;
+ public DelayedShareFetchPartitionKey(TopicIdPartition topicIdPartition) {
+ this(topicIdPartition.topicId(), topicIdPartition.partition());
+ }
+
public DelayedShareFetchPartitionKey(Uuid topicId, int partition) {
this.topicId = topicId;
this.partition = partition;