This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 617c96cea49 KAFKA-15931: Cancel RemoteLogReader gracefully (#19331)
617c96cea49 is described below
commit 617c96cea49ca9dda52ccc75ab2b6df56b24294c
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Wed Apr 2 02:22:53 2025 +0300
KAFKA-15931: Cancel RemoteLogReader gracefully (#19331)
Backports f24945b519005c0bc7a28db2db7aae6cec158927 to 4.0
Instead of reopening the transaction index, it cancels the RemoteFetchTask
without interrupting it--avoiding to close the TransactionIndex channel.
This will lead to complete the execution of the remote fetch but ignoring
the results. Given that this is considered a rare case, we could live with
this. If it becomes a performance issue, it could be optimized.
Reviewers: Jun Rao <[email protected]>
---
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala | 5 +++--
.../test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala | 2 +-
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 45bfe69844a..e6bdce63e68 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -87,8 +87,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
}
override def onExpiration(): Unit = {
- // cancel the remote storage read task, if it has not been executed yet
- val cancelled = remoteFetchTask.cancel(true)
+ // cancel the remote storage read task, if it has not been executed yet and
+ // avoid interrupting the task if it is already running as it may force
closing opened/cached resources as transaction index.
+ val cancelled = remoteFetchTask.cancel(false)
if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo:
$remoteFetchInfo could not be cancelled and its isDone value is
${remoteFetchTask.isDone}")
DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index 264f5310c2d..b3f032e3dba 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -200,7 +200,7 @@ class DelayedRemoteFetchTest {
delayedRemoteFetch.run()
// Check that the task was cancelled and force-completed
- verify(remoteFetchTask).cancel(true)
+ verify(remoteFetchTask).cancel(false)
assertTrue(delayedRemoteFetch.isCompleted)
// Check that the ExpiresPerSec metric was incremented