This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f0c3d93104c KAFKA-19597: Stop the RSM after closing the remote-log 
reader threads to handle requests gracefully (#20342)
f0c3d93104c is described below

commit f0c3d93104caa33a2566eff0656de37573a3922a
Author: Kamal Chandraprakash <kamal.chandraprak...@gmail.com>
AuthorDate: Tue Aug 19 21:56:27 2025 +0530

    KAFKA-19597: Stop the RSM after closing the remote-log reader threads to 
handle requests gracefully (#20342)
    
    During shutdown, when the RSM closes first, then the ongoing requests
    might throw an error. To handle the ongoing requests gracefully, closing
    the RSM after closing the remote-log reader thread pools.
    
    Reviewers: Satish Duggana <sati...@apache.org>
---
 .../apache/kafka/server/log/remote/storage/RemoteLogManager.java  | 8 ++++----
 .../kafka/server/log/remote/storage/RemoteLogManagerTest.java     | 4 ++--
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 47b5417e25a..a9b2c67ba79 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -2038,9 +2038,6 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
                 leaderCopyRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
                 
leaderExpirationRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
                 followerRLMTasks.values().forEach(RLMTaskWithFuture::cancel);
-                Utils.closeQuietly(remoteStorageManagerPlugin, 
"remoteStorageManagerPlugin");
-                Utils.closeQuietly(remoteLogMetadataManagerPlugin, 
"remoteLogMetadataManagerPlugin");
-                Utils.closeQuietly(indexCache, "RemoteIndexCache");
 
                 rlmCopyThreadPool.close();
                 rlmExpirationThreadPool.close();
@@ -2050,10 +2047,13 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
                 } finally {
                     removeMetrics();
                 }
-
                 leaderCopyRLMTasks.clear();
                 leaderExpirationRLMTasks.clear();
                 followerRLMTasks.clear();
+
+                Utils.closeQuietly(indexCache, "RemoteIndexCache");
+                Utils.closeQuietly(remoteLogMetadataManagerPlugin, 
"remoteLogMetadataManagerPlugin");
+                Utils.closeQuietly(remoteStorageManagerPlugin, 
"remoteStorageManagerPlugin");
                 closed = true;
             }
         }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 5201511e75d..b45b1118f5c 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -1769,9 +1769,9 @@ public class RemoteLogManagerTest {
     void testIdempotentClose() throws IOException {
         remoteLogManager.close();
         remoteLogManager.close();
-        InOrder inorder = inOrder(remoteStorageManager, 
remoteLogMetadataManager);
-        inorder.verify(remoteStorageManager, times(1)).close();
+        InOrder inorder = inOrder(remoteLogMetadataManager, 
remoteStorageManager);
         inorder.verify(remoteLogMetadataManager, times(1)).close();
+        inorder.verify(remoteStorageManager, times(1)).close();
     }
 
     @Test

Reply via email to