This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 69e2524c45 Fix resource leak in IndexStoreFile shutdown (#9840)
69e2524c45 is described below
commit 69e2524c45c512f550c4daa2a7e4935a63bc1cc7
Author: rongtong <[email protected]>
AuthorDate: Fri Nov 14 11:32:35 2025 +0800
Fix resource leak in IndexStoreFile shutdown (#9840)
* Fix resource leak in IndexStoreFile shutdown
Add cleanResources() calls after shutdown() for both mappedFile and
compactMappedFile to ensure proper cleanup of memory-mapped buffers and file
channels.
Change-Id: I2716b4e3b0cd281e89a9d5a00a389dc6048de3e7
Co-developed-by: Cursor <[email protected]>
* Add a FileNotFoundException check to allow breaking out of the loop after
shutdown
Change-Id: Icc0063544ef91de8b2bd96a80f3829a6922cb0e6
* Add a FileNotFoundException check to allow breaking out of the loop after
shutdown.
Change-Id: I53a7d2800775d80c1db8feb159c5c258b3517f09
---------
Co-authored-by: RongtongJin <[email protected]>
Co-authored-by: ShannonDing <[email protected]>
---
.../rocketmq/tieredstore/index/IndexStoreFile.java | 13 +++++++----
.../tieredstore/index/IndexStoreService.java | 25 +++++++++++++---------
2 files changed, 24 insertions(+), 14 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
index c58e91e9c0..e0a3c5cd0a 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.tieredstore.index;
import com.google.common.base.Stopwatch;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
@@ -243,13 +244,13 @@ public class IndexStoreFile implements IndexFile {
topicId, queueId, offset, size, hashCode, timeDiff,
slotOldValue);
int itemIndex = this.indexItemCount.incrementAndGet();
int itemPosition = this.getItemPosition(itemIndex);
-
+
if (writeWithoutMmap && fileChannel != null) {
// Use FileChannel for writing
ByteBuffer itemBuffer = indexItem.getByteBuffer();
fileChannel.position(itemPosition);
fileChannel.write(itemBuffer);
-
+
ByteBuffer slotBuffer = ByteBuffer.allocate(Integer.BYTES);
slotBuffer.putInt(0, itemIndex);
slotBuffer.position(0);
@@ -436,6 +437,8 @@ public class IndexStoreFile implements IndexFile {
buffer = compactToNewFile();
log.debug("IndexStoreFile do compaction, timestamp: {}, file size:
{}, cost: {}ms",
this.getTimestamp(), buffer.capacity(),
stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
} catch (Throwable e) {
log.error("IndexStoreFile do compaction, timestamp: {}, cost:
{}ms",
this.getTimestamp(), stopwatch.elapsed(TimeUnit.MICROSECONDS),
e);
@@ -482,7 +485,7 @@ public class IndexStoreFile implements IndexFile {
buffer.get(payload);
int newSlotValue =
payloadBuffer.getInt(COMPACT_INDEX_ITEM_SIZE);
buffer.limit(COMPACT_INDEX_ITEM_SIZE);
-
+
if (writeWithoutMmap && compactFileChannel != null) {
// Use FileChannel for writing
ByteBuffer writeBuffer = ByteBuffer.wrap(payload, 0,
COMPACT_INDEX_ITEM_SIZE);
@@ -537,11 +540,13 @@ public class IndexStoreFile implements IndexFile {
}
if (this.mappedFile != null) {
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+ this.mappedFile.cleanResources();
}
if (this.compactMappedFile != null) {
this.compactMappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
+ this.compactMappedFile.cleanResources();
}
- } catch (Exception e) {
+ } catch (Throwable e) {
log.error("IndexStoreFile shutdown failed, timestamp: {}, status:
{}", this.getTimestamp(), fileStatus.get(), e);
} finally {
fileReadWriteLock.writeLock().unlock();
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 07609bbab9..132d2162f9 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -422,15 +422,19 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
@Override
public void run() {
while (!this.isStopped()) {
- long expireTimestamp = System.currentTimeMillis()
- -
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
- this.destroyExpiredFile(expireTimestamp);
- IndexFile indexFile = this.getNextSealedFile();
- if (indexFile != null) {
- if (this.doCompactThenUploadFile(indexFile)) {
- this.setCompactTimestamp(indexFile.getTimestamp());
- continue;
+ try {
+ long expireTimestamp = System.currentTimeMillis()
+ -
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
+ this.destroyExpiredFile(expireTimestamp);
+ IndexFile indexFile = this.getNextSealedFile();
+ if (indexFile != null) {
+ if (this.doCompactThenUploadFile(indexFile)) {
+ this.setCompactTimestamp(indexFile.getTimestamp());
+ continue;
+ }
}
+ } catch (Throwable e) {
+ log.error("IndexStoreService running error", e);
}
this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
}
@@ -439,13 +443,14 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
if (autoCreateNewFile) {
this.forceUpload();
}
- this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
- this.timeStoreTable.clear();
} catch (Exception e) {
log.error("IndexStoreService shutdown error", e);
} finally {
+ this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
+ this.timeStoreTable.clear();
readWriteLock.writeLock().unlock();
}
+
log.info(this.getServiceName() + " service shutdown");
}
}