This is an automated email from the ASF dual-hosted git repository. kamalcph pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f0c3d93104c KAFKA-19597: Stop the RSM after closing the remote-log reader threads to handle requests gracefully (#20342) f0c3d93104c is described below commit f0c3d93104caa33a2566eff0656de37573a3922a Author: Kamal Chandraprakash <kamal.chandraprak...@gmail.com> AuthorDate: Tue Aug 19 21:56:27 2025 +0530 KAFKA-19597: Stop the RSM after closing the remote-log reader threads to handle requests gracefully (#20342) During shutdown, when the RSM closes first, then the ongoing requests might throw an error. To handle the ongoing requests gracefully, closing the RSM after closing the remote-log reader thread pools. Reviewers: Satish Duggana <sati...@apache.org> --- .../apache/kafka/server/log/remote/storage/RemoteLogManager.java | 8 ++++---- .../kafka/server/log/remote/storage/RemoteLogManagerTest.java | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index 47b5417e25a..a9b2c67ba79 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -2038,9 +2038,6 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { leaderCopyRLMTasks.values().forEach(RLMTaskWithFuture::cancel); leaderExpirationRLMTasks.values().forEach(RLMTaskWithFuture::cancel); followerRLMTasks.values().forEach(RLMTaskWithFuture::cancel); - Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin"); - Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin"); - Utils.closeQuietly(indexCache, "RemoteIndexCache"); rlmCopyThreadPool.close(); rlmExpirationThreadPool.close(); @@ -2050,10 +2047,13 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader { } finally { removeMetrics(); } - leaderCopyRLMTasks.clear(); leaderExpirationRLMTasks.clear(); followerRLMTasks.clear(); + + Utils.closeQuietly(indexCache, "RemoteIndexCache"); + Utils.closeQuietly(remoteLogMetadataManagerPlugin, "remoteLogMetadataManagerPlugin"); + Utils.closeQuietly(remoteStorageManagerPlugin, "remoteStorageManagerPlugin"); closed = true; } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 5201511e75d..b45b1118f5c 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -1769,9 +1769,9 @@ public class RemoteLogManagerTest { void testIdempotentClose() throws IOException { remoteLogManager.close(); remoteLogManager.close(); - InOrder inorder = inOrder(remoteStorageManager, remoteLogMetadataManager); - inorder.verify(remoteStorageManager, times(1)).close(); + InOrder inorder = inOrder(remoteLogMetadataManager, remoteStorageManager); inorder.verify(remoteLogMetadataManager, times(1)).close(); + inorder.verify(remoteStorageManager, times(1)).close(); } @Test