This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 94ba5e14f8 [ISSUE #10031] Add PreprocessHandler interface in 
AllocateMappedFileService
94ba5e14f8 is described below

commit 94ba5e14f8c5706f0adc7f04be5e243cde7929d1
Author: guyinyou <[email protected]>
AuthorDate: Wed Jan 21 10:07:51 2026 +0800

    [ISSUE #10031] Add PreprocessHandler interface in AllocateMappedFileService
    
    Change-Id: I4e81916a79f89c095ffb7b860c8ccd49e88c76ea
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../rocketmq/store/AllocateMappedFileService.java  | 34 ++++++++++++++++++++++
 1 file changed, 34 insertions(+)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java 
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index 7664e284ec..85042fdbc9 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -45,12 +45,31 @@ public class AllocateMappedFileService extends 
ServiceThread {
         new PriorityBlockingQueue<>();
     private volatile boolean hasException = false;
     private DefaultMessageStore messageStore;
+    private PreprocessHandler preprocessHandler;
 
     public AllocateMappedFileService(DefaultMessageStore messageStore) {
         this.messageStore = messageStore;
     }
 
+    /**
+     * Set preprocess handler for external extension
+     *
+     * @param preprocessHandler the preprocess handler
+     */
+    public void setPreprocessHandler(PreprocessHandler preprocessHandler) {
+        this.preprocessHandler = preprocessHandler;
+    }
+
     public MappedFile putRequestAndReturnMappedFile(String nextFilePath, 
String nextNextFilePath, int fileSize) {
+        // Execute preprocess logic if handler is set
+        final PreprocessHandler finalPreprocessHandler = 
this.preprocessHandler;
+        if (finalPreprocessHandler != null) {
+            try {
+                finalPreprocessHandler.preprocess(nextFilePath, 
nextNextFilePath, fileSize);
+            } catch (Throwable t) {
+                log.warn("Preprocess handler in AllocateMappedFileService 
execution failed", t);
+            }
+        }
         int canSubmitRequests = 2;
         if (this.messageStore.isTransientStorePoolEnable()) {
             if 
(this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
@@ -230,6 +249,21 @@ public class AllocateMappedFileService extends 
ServiceThread {
         return true;
     }
 
+    /**
+     * Preprocess handler interface for external extension
+     */
+    @FunctionalInterface
+    public interface PreprocessHandler {
+        /**
+         * Preprocess before allocating mapped file
+         *
+         * @param nextFilePath the next file path
+         * @param nextNextFilePath the next next file path
+         * @param fileSize the file size
+         */
+        void preprocess(String nextFilePath, String nextNextFilePath, int 
fileSize);
+    }
+
     static class AllocateRequest implements Comparable<AllocateRequest> {
         // Full file path
         private String filePath;

Reply via email to