This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32dedbdb556 MINOR: Update the delayed remote fetch purgatory cleanup
comment (#20721)
32dedbdb556 is described below
commit 32dedbdb556d89401f9f16846eb6f6d57fe0e8cd
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Wed Oct 22 08:41:57 2025 +0530
MINOR: Update the delayed remote fetch purgatory cleanup comment (#20721)
- Updated the delayed remote fetch purgatory cleanup comment
Reviewers: Luke Chen <[email protected]>
---
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala | 2 +-
core/src/main/scala/kafka/server/ReplicaManager.scala | 4 ++--
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 2 +-
3 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 03e6f8d230f..fc2926988c0 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -109,7 +109,7 @@ class DelayedRemoteFetch(remoteFetchTasks:
util.Map[TopicIdPartition, Future[Voi
override def onComplete(): Unit = {
val fetchPartitionData = localReadResults.map { case (tp, result) =>
val remoteFetchResult = remoteFetchResults.get(tp)
- if (remoteFetchInfos.containsKey(tp)
+ if (remoteFetchResults.containsKey(tp)
&& remoteFetchResult.isDone
&& result.error == Errors.NONE
&& result.info.delayedRemoteStorageFetch.isPresent) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 959ae2cde38..a4c5416cb34 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1656,6 +1656,8 @@ class ReplicaManager(val config: KafkaConfig,
// create a list of (topic, partition) pairs to use as keys for this
delayed fetch operation
val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new
TopicPartitionOperationKey(tp) }.toList
+ // We only guarantee eventual cleanup via the next FETCH request for the
same set of partitions or
+ // using reaper-thread.
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch,
delayedFetchKeys.asJava)
}
@@ -1740,8 +1742,6 @@ class ReplicaManager(val config: KafkaConfig,
// try to complete the request immediately, otherwise put it into the
purgatory;
// this is because while the delayed fetch operation is being created,
new requests
// may arrive and hence make this operation completable.
- // We only guarantee eventual cleanup via the next FETCH request for
the same set of partitions or
- // using reaper-thread.
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch,
delayedFetchKeys.asJava)
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 577e4f6b128..fa43b8bc1a4 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2965,7 +2965,7 @@ class ReplicaManagerTest {
directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
buildRemoteLogAuxState: Boolean = false,
remoteFetchQuotaExceeded: Option[Boolean] = None,
- remoteFetchReaperEnabled: Boolean = false,
+ remoteFetchReaperEnabled: Boolean = false
): ReplicaManager = {
val props = TestUtils.createBrokerConfig(brokerId)
val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath