This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 6ddcfab5782 KAFKA-19389: Fix memory consumption for completed share 
fetch requests (#19928)
6ddcfab5782 is described below

commit 6ddcfab578245268239767295b416b0ce8a0a358
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Jun 10 17:36:27 2025 +0100

    KAFKA-19389: Fix memory consumption for completed share fetch requests 
(#19928)
    
    For ShareFetch Requests, the fetch happens through DelayedShareFetch
    operation. The operations which are already completed has reference to
    data being sent as response. As the operation is watched over multiple
    keys i.e. DelayedShareFetchGroupKey and DelayedShareFetchPartitionKey,
    hence if the operation is already completed by either  watched keys  but
    then again the reference to the operation is still present in other
    watched  key. Which means the memory can only be free once purge
    operation is  triggered by DelayedOperationPurgatory which removes the
    watched key  operation from remaining keys, as the operation is already
    completed.
    
    The purge operation is dependent on the config
    `ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG`
    hence if the value is not smaller than the number of share fetch
    requests which can consume complete memory of the broker then broker can
    go out of memory. This can also be avoided by having lower fetch max
    bytes for request but this value is client dependent hence can't rely to
    prevent  the broker.
    
    This PR triggers the completion on both watched keys hence the
    DelayedShareFetch operation shall be removed from both keys which frees
    the broker memory as soon the share fetch response is sent.
    
    #### Testing
    
    Tested with LocalTieredStorage where broker goes OOM after reading some
    8040 messages before the fix, with default configurations as mentioned
    in the
    doc
    
    [here](https://kafka.apache.org/documentation/#tiered_storage_config_ex).
    But after the fix the consumption continues without any issue. And the
    memory is released instantaneously.
    
    Reviewers: Jun Rao <[email protected]>, Andrew Schofield
    <[email protected]>
---
 .../java/kafka/server/share/DelayedShareFetch.java | 22 ++++++++++++++++------
 .../share/fetch/DelayedShareFetchPartitionKey.java |  5 +++++
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index fce50613bec..2d01fb959cb 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -34,6 +34,7 @@ import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.server.purgatory.DelayedOperation;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
+import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
 import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
 import org.apache.kafka.server.share.fetch.ShareFetch;
 import org.apache.kafka.server.share.fetch.ShareFetchPartitionData;
@@ -804,13 +805,22 @@ public class DelayedShareFetch extends DelayedOperation {
         }
         // Releasing the lock to move ahead with the next request in queue.
         releasePartitionLocks(topicIdPartitions);
-        // If we have a fetch request completed for a topic-partition, we 
release the locks for that partition,
-        // then we should check if there is a pending share fetch request for 
the topic-partition and complete it.
-        // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
-        // we directly call delayedShareFetchPurgatory.checkAndComplete
-        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition ->
+        replicaManager.addToActionQueue(() -> 
topicIdPartitions.forEach(topicIdPartition -> {
+            // If we have a fetch request completed for a share-partition, we 
release the locks for that partition,
+            // then we should check if there is a pending share fetch request 
for the share-partition and complete it.
+            // We add the action to delayed actions queue to avoid an infinite 
call stack, which could happen if
+            // we directly call delayedShareFetchPurgatory.checkAndComplete.
             replicaManager.completeDelayedShareFetchRequest(
-                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()))));
+                new DelayedShareFetchGroupKey(shareFetch.groupId(), 
topicIdPartition.topicId(), topicIdPartition.partition()));
+            // As DelayedShareFetch operation is watched over multiple keys, 
same operation might be
+            // completed and can contain references to data fetched. Hence, if 
the operation is not
+            // removed from other watched keys then there can be a memory 
leak. The removal of the
+            // operation is dependent on the purge task by 
DelayedOperationPurgatory. Hence, this can
+            // also be prevented by setting smaller value for configuration 
{@link ShareGroupConfig#SHARE_FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG}.
+            // However, it's best to trigger the check on all the keys that 
are being watched which
+            // should free the memory for the completed operation.
+            replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchPartitionKey(topicIdPartition));
+        }));
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
 
b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
index 584613cde17..c1e40975c2b 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/DelayedShareFetchPartitionKey.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.server.share.fetch;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.Uuid;
 
 import java.util.Objects;
@@ -27,6 +28,10 @@ public class DelayedShareFetchPartitionKey implements 
DelayedShareFetchKey {
     private final Uuid topicId;
     private final int partition;
 
+    public DelayedShareFetchPartitionKey(TopicIdPartition topicIdPartition) {
+        this(topicIdPartition.topicId(), topicIdPartition.partition());
+    }
+
     public DelayedShareFetchPartitionKey(Uuid topicId, int partition) {
         this.topicId = topicId;
         this.partition = partition;

Reply via email to