This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new c37ac21cc9a KAFKA-15931: Cancel RemoteLogReader gracefully (#19150)
c37ac21cc9a is described below

commit c37ac21cc9ad11e0bec301ae9c84a35edd5392c3
Author: Jorge Esteban Quilcate Otoya <[email protected]>
AuthorDate: Wed Apr 2 00:10:12 2025 +0300

    KAFKA-15931: Cancel RemoteLogReader gracefully (#19150)
    
    Backports f24945b519005c0bc7a28db2db7aae6cec158927 to 3.9
    
    Instead of reopening the transaction index, it cancels the RemoteFetchTask 
without interrupting it--avoiding to close the TransactionIndex channel.
    
    This will lead to complete the execution of the remote fetch but ignoring 
the results. Given that this is considered a rare case, we could live with 
this. If it becomes a performance issue, it could be optimized.
    
    Reviewers: Jun Rao <[email protected]>
---
 core/src/main/scala/kafka/server/DelayedRemoteFetch.scala            | 5 +++--
 .../test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala | 2 +-
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 58a866aa4a6..f9776fb287d 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -84,8 +84,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
   }
 
   override def onExpiration(): Unit = {
-    // cancel the remote storage read task, if it has not been executed yet
-    val cancelled = remoteFetchTask.cancel(true)
+    // cancel the remote storage read task, if it has not been executed yet and
+    // avoid interrupting the task if it is already running as it may force 
closing opened/cached resources as transaction index.
+    val cancelled = remoteFetchTask.cancel(false)
     if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: 
$remoteFetchInfo could not be cancelled and its isDone value is 
${remoteFetchTask.isDone}")
 
     DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index ea1ffaf0b11..ce758992fea 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -199,7 +199,7 @@ class DelayedRemoteFetchTest {
     delayedRemoteFetch.run()
 
     // Check that the task was cancelled and force-completed
-    verify(remoteFetchTask).cancel(true)
+    verify(remoteFetchTask).cancel(false)
     assertTrue(delayedRemoteFetch.isCompleted)
 
     // Check that the ExpiresPerSec metric was incremented

Reply via email to