This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32dedbdb556 MINOR: Update the delayed remote fetch purgatory cleanup 
comment (#20721)
32dedbdb556 is described below

commit 32dedbdb556d89401f9f16846eb6f6d57fe0e8cd
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Wed Oct 22 08:41:57 2025 +0530

    MINOR: Update the delayed remote fetch purgatory cleanup comment (#20721)
    
    - Updated the delayed remote fetch purgatory cleanup comment
    
    Reviewers: Luke Chen <[email protected]>
---
 core/src/main/scala/kafka/server/DelayedRemoteFetch.scala      | 2 +-
 core/src/main/scala/kafka/server/ReplicaManager.scala          | 4 ++--
 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 03e6f8d230f..fc2926988c0 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -109,7 +109,7 @@ class DelayedRemoteFetch(remoteFetchTasks: 
util.Map[TopicIdPartition, Future[Voi
   override def onComplete(): Unit = {
     val fetchPartitionData = localReadResults.map { case (tp, result) =>
       val remoteFetchResult = remoteFetchResults.get(tp)
-      if (remoteFetchInfos.containsKey(tp)
+      if (remoteFetchResults.containsKey(tp)
         && remoteFetchResult.isDone
         && result.error == Errors.NONE
         && result.info.delayedRemoteStorageFetch.isPresent) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 959ae2cde38..a4c5416cb34 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1656,6 +1656,8 @@ class ReplicaManager(val config: KafkaConfig,
 
     // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
     val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new 
TopicPartitionOperationKey(tp) }.toList
+    // We only guarantee eventual cleanup via the next FETCH request for the 
same set of partitions or
+    // using reaper-thread.
     delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, 
delayedFetchKeys.asJava)
   }
 
@@ -1740,8 +1742,6 @@ class ReplicaManager(val config: KafkaConfig,
         // try to complete the request immediately, otherwise put it into the 
purgatory;
         // this is because while the delayed fetch operation is being created, 
new requests
         // may arrive and hence make this operation completable.
-        // We only guarantee eventual cleanup via the next FETCH request for 
the same set of partitions or
-        // using reaper-thread.
         delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys.asJava)
       }
     }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 577e4f6b128..fa43b8bc1a4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2965,7 +2965,7 @@ class ReplicaManagerTest {
     directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
     buildRemoteLogAuxState: Boolean = false,
     remoteFetchQuotaExceeded: Option[Boolean] = None,
-    remoteFetchReaperEnabled: Boolean = false,
+    remoteFetchReaperEnabled: Boolean = false
   ): ReplicaManager = {
     val props = TestUtils.createBrokerConfig(brokerId)
     val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath

Reply via email to