This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 69e2524c45 Fix resource leak in IndexStoreFile shutdown (#9840)
69e2524c45 is described below

commit 69e2524c45c512f550c4daa2a7e4935a63bc1cc7
Author: rongtong <[email protected]>
AuthorDate: Fri Nov 14 11:32:35 2025 +0800

    Fix resource leak in IndexStoreFile shutdown (#9840)
    
    * Fix resource leak in IndexStoreFile shutdown
    
    Add cleanResources() calls after shutdown() for both mappedFile and 
compactMappedFile to ensure proper cleanup of memory-mapped buffers and file 
channels.
    
    Change-Id: I2716b4e3b0cd281e89a9d5a00a389dc6048de3e7
    Co-developed-by: Cursor <[email protected]>
    
    * Add a FileNotFoundException check to allow breaking out of the loop after 
shutdown
    
    Change-Id: Icc0063544ef91de8b2bd96a80f3829a6922cb0e6
    
    * Add a FileNotFoundException check to allow breaking out of the loop after 
shutdown.
    
    Change-Id: I53a7d2800775d80c1db8feb159c5c258b3517f09
    
    ---------
    
    Co-authored-by: RongtongJin <[email protected]>
    Co-authored-by: ShannonDing <[email protected]>
---
 .../rocketmq/tieredstore/index/IndexStoreFile.java | 13 +++++++----
 .../tieredstore/index/IndexStoreService.java       | 25 +++++++++++++---------
 2 files changed, 24 insertions(+), 14 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index c58e91e9c0..e0a3c5cd0a 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.tieredstore.index;
 
 import com.google.common.base.Stopwatch;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
@@ -243,13 +244,13 @@ public class IndexStoreFile implements IndexFile {
                     topicId, queueId, offset, size, hashCode, timeDiff, 
slotOldValue);
                 int itemIndex = this.indexItemCount.incrementAndGet();
                 int itemPosition = this.getItemPosition(itemIndex);
-                
+
                 if (writeWithoutMmap && fileChannel != null) {
                     // Use FileChannel for writing
                     ByteBuffer itemBuffer = indexItem.getByteBuffer();
                     fileChannel.position(itemPosition);
                     fileChannel.write(itemBuffer);
-                    
+
                     ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
                     slotBuffer.putInt(0, itemIndex);
                     slotBuffer.position(0);
@@ -436,6 +437,8 @@ public class IndexStoreFile implements IndexFile {
             buffer = compactToNewFile();
             log.debug("IndexStoreFile do compaction, timestamp: {}, file size: 
{}, cost: {}ms",
                 this.getTimestamp(), buffer.capacity(), 
stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
         } catch (Throwable e) {
             log.error("IndexStoreFile do compaction, timestamp: {}, cost: 
{}ms",
                 this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS), 
e);
@@ -482,7 +485,7 @@ public class IndexStoreFile implements IndexFile {
                 buffer.get(payload);
                 int newSlotValue = 
payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
                 buffer.limit(COMPACT_INDEX_ITEM_SIZE);
-                
+
                 if (writeWithoutMmap && compactFileChannel != null) {
                     // Use FileChannel for writing
                     ByteBuffer writeBuffer = ByteBuffer.wrap(payload, 0, 
COMPACT_INDEX_ITEM_SIZE);
@@ -537,11 +540,13 @@ public class IndexStoreFile implements IndexFile {
             }
             if (this.mappedFile != null) {
                 this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+                this.mappedFile.cleanResources();
             }
             if (this.compactMappedFile != null) {
                 this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+                this.compactMappedFile.cleanResources();
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             log.error("IndexStoreFile shutdown failed, timestamp: {}, status: 
{}", this.getTimestamp(), fileStatus.get(), e);
         } finally {
             fileReadWriteLock.writeLock().unlock();
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 07609bbab9..132d2162f9 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -422,15 +422,19 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
     @Override
     public void run() {
         while (!this.isStopped()) {
-            long expireTimestamp = System.currentTimeMillis()
-                - 
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
-            this.destroyExpiredFile(expireTimestamp);
-            IndexFile indexFile = this.getNextSealedFile();
-            if (indexFile != null) {
-                if (this.doCompactThenUploadFile(indexFile)) {
-                    this.setCompactTimestamp(indexFile.getTimestamp());
-                    continue;
+            try {
+                long expireTimestamp = System.currentTimeMillis()
+                    - 
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
+                this.destroyExpiredFile(expireTimestamp);
+                IndexFile indexFile = this.getNextSealedFile();
+                if (indexFile != null) {
+                    if (this.doCompactThenUploadFile(indexFile)) {
+                        this.setCompactTimestamp(indexFile.getTimestamp());
+                        continue;
+                    }
                 }
+            } catch (Throwable e) {
+                log.error("IndexStoreService running error", e);
             }
             this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
         }
@@ -439,13 +443,14 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
             if (autoCreateNewFile) {
                 this.forceUpload();
             }
-            this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
-            this.timeStoreTable.clear();
         } catch (Exception e) {
             log.error("IndexStoreService shutdown error", e);
         } finally {
+            this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
+            this.timeStoreTable.clear();
             readWriteLock.writeLock().unlock();
         }
+
         log.info(this.getServiceName() + " service shutdown");
     }
 }

Reply via email to