This is an automated email from the ASF dual-hosted git repository.

lollipopjin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 91cb333716 [ISSUE #10462] Improve error handling in tiered storage 
(#10473)
91cb333716 is described below

commit 91cb333716018d477782754e997bf2232327c97a
Author: lizhimins <[email protected]>
AuthorDate: Fri Jun 12 15:45:06 2026 +0800

    [ISSUE #10462] Improve error handling in tiered storage (#10473)
---
 .../rocketmq/tieredstore/TieredMessageStore.java   |  8 +++++++-
 .../rocketmq/tieredstore/file/FlatAppendFile.java  |  5 +++--
 .../rocketmq/tieredstore/file/FlatMessageFile.java |  3 +--
 .../tieredstore/index/IndexStoreService.java       |  4 ++++
 .../rocketmq/tieredstore/provider/FileSegment.java | 11 +++++-----
 .../tieredstore/provider/FileSegmentFactory.java   |  8 ++++++--
 .../tieredstore/provider/PosixFileSegment.java     |  1 +
 .../tieredstore/file/FlatAppendFileTest.java       | 24 ++++++++++++++++++++++
 8 files changed, 52 insertions(+), 12 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 2b7c347b3c..75f4248705 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.rocketmq.common.BoundaryType;
@@ -292,7 +293,12 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
 
                 return result;
             }).exceptionally(e -> {
-                log.error("GetMessageAsync from tiered store failed", e);
+                Throwable cause = e instanceof CompletionException && 
e.getCause() != null ? e.getCause() : e;
+                if (cause instanceof Error) {
+                    throw (Error) cause;
+                }
+                log.error("TieredMessageStore#getMessageAsync, get message 
from tiered store failed, " +
+                    "topic={}, queueId={}, offset={}", topic, queueId, offset, 
cause);
                 return next.getMessage(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
             });
     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
index 8188febcc4..6380d90a49 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
@@ -38,6 +38,7 @@ public class FlatAppendFile {
 
     protected static final Logger log = 
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
     public static final long GET_FILE_SIZE_ERROR = -1L;
+    public static final long GET_TIMESTAMP_ERROR = -1L;
 
     protected final String filePath;
     protected final FileSegmentType fileType;
@@ -164,12 +165,12 @@ public class FlatAppendFile {
 
     public long getMinTimestamp() {
         List<FileSegment> list = this.fileSegmentTable;
-        return list.isEmpty() ? GET_FILE_SIZE_ERROR : 
list.get(0).getMinTimestamp();
+        return list.isEmpty() ? GET_TIMESTAMP_ERROR : 
list.get(0).getMinTimestamp();
     }
 
     public long getMaxTimestamp() {
         List<FileSegment> list = this.fileSegmentTable;
-        return list.isEmpty() ? GET_FILE_SIZE_ERROR : list.get(list.size() - 
1).getMaxTimestamp();
+        return list.isEmpty() ? GET_TIMESTAMP_ERROR : list.get(list.size() - 
1).getMaxTimestamp();
     }
 
     public FileSegment rollingNewFile(long offset) {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 413c6af65e..35084dd2e5 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -231,8 +231,7 @@ public class FlatMessageFile implements FlatFileInterface {
 
     @Override
     public CompletableFuture<Boolean> commitAsync() {
-        // acquire lock
-        if (commitLock.drainPermits() <= 0) {
+        if (!commitLock.tryAcquire()) {
             return CompletableFuture.completedFuture(false);
         }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 0a768e3d1d..74f2b94afa 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -327,6 +327,10 @@ public class IndexStoreService extends ServiceThread 
implements IndexService {
         boolean result = flatAppendFile.commitAsync().join();
 
         List<FileSegment> fileSegmentList = 
flatAppendFile.getFileSegmentList();
+        if (fileSegmentList.isEmpty()) {
+            log.warn("IndexStoreService#doCompactThenUploadFile, 
fileSegmentList empty, timestamp={}", indexFile.getTimestamp());
+            return false;
+        }
         FileSegment fileSegment = fileSegmentList.get(fileSegmentList.size() - 
1);
         if (!result || fileSegment == null || fileSegment.getMinTimestamp() != 
indexFile.getTimestamp()) {
             log.warn("IndexStoreService#doCompactThenUploadFile, upload 
compacted file error, timestamp={}", indexFile.getTimestamp());
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
index 7235bc1083..0d4e39b74f 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
@@ -335,25 +335,26 @@ public abstract class FileSegment implements 
Comparable<FileSegment>, FileSegmen
     public CompletableFuture<ByteBuffer> readAsync(long position, int length) {
         CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
 
-        if (position < 0 || position >= commitPosition) {
+        long currentCommitPosition = commitPosition;
+        if (position < 0 || position >= currentCommitPosition) {
             future.completeExceptionally(new 
TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM,
                 String.format("FileSegment read position illegal, filePath=%s, 
fileType=%s, position=%d, length=%d, commit=%d",
-                    filePath, fileType, position, length, commitPosition)));
+                    filePath, fileType, position, length, 
currentCommitPosition)));
             return future;
         }
 
         if (length <= 0) {
             future.completeExceptionally(new 
TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM,
                 String.format("FileSegment read length illegal, filePath=%s, 
fileType=%s, position=%d, length=%d, commit=%d",
-                    filePath, fileType, position, length, commitPosition)));
+                    filePath, fileType, position, length, 
currentCommitPosition)));
             return future;
         }
 
-        int readableBytes = (int) (commitPosition - position);
+        int readableBytes = (int) (currentCommitPosition - position);
         if (readableBytes < length) {
             log.debug("FileSegment#readAsync, request position exceeds commit 
position, " +
                     "file={}, requestPosition={}, commitPosition={}, 
changeLength={} to {}",
-                getPath(), position, commitPosition, length, readableBytes);
+                getPath(), position, currentCommitPosition, length, 
readableBytes);
             length = readableBytes;
         }
         return this.read0(position, length);
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentFactory.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentFactory.java
index ace6d8f08f..e4ace510ea 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentFactory.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentFactory.java
@@ -42,7 +42,9 @@ public class FileSegmentFactory {
             fileSegmentConstructor = clazz.getConstructor(
                 MessageStoreConfig.class, FileSegmentType.class, String.class, 
Long.TYPE, MessageStoreExecutor.class);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException(String.format(
+                "FileSegmentFactory#init, failed to load FileSegment provider: 
%s",
+                storeConfig.getTieredBackendServiceProvider()), e);
         }
     }
 
@@ -58,7 +60,9 @@ public class FileSegmentFactory {
         try {
             return fileSegmentConstructor.newInstance(this.storeConfig, 
fileType, filePath, baseOffset, executor);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException(String.format(
+                "FileSegmentFactory#createSegment, failed to create 
FileSegment: type=%s, path=%s, baseOffset=%d",
+                fileType, filePath, baseOffset), e);
         }
     }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index 70dd4f081b..63196135f6 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -202,6 +202,7 @@ public class PosixFileSegment extends FileSegment {
                 long costTime = 
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
                 attributesBuilder.put(LABEL_SUCCESS, false);
                 TieredStoreMetricsManager.providerRpcLatency.record(costTime, 
attributesBuilder.build());
+                throw new RuntimeException(e);
             }
             return byteBuffer;
         }, executor.getBufferFetchExecutor());
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
index ed223fcf9a..b3df4e8aec 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
@@ -37,6 +37,9 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.when;
 
 public class FlatAppendFileTest {
 
@@ -216,4 +219,25 @@ public class FlatAppendFileTest {
         flatFile.destroy();
         Assert.assertEquals(0, flatFile.fileSegmentTable.size());
     }
+
+    @Test
+    public void getFileCorrectSizeTest() {
+        String filePath = MessageStoreUtil.toFilePath(queue);
+        FlatAppendFile flatFile = 
flatFileFactory.createFlatFileForConsumeQueue(filePath);
+
+        // first try succeeds
+        FileSegment success = Mockito.mock(FileSegment.class);
+        when(success.getSize()).thenReturn(1024L);
+        Assert.assertEquals(1024L, flatFile.getFileCorrectSize(success));
+        Mockito.verify(success, Mockito.times(1)).getSize();
+
+        // retry then succeed
+        FileSegment retry = Mockito.mock(FileSegment.class);
+        when(retry.getSize())
+            .thenReturn(FlatAppendFile.GET_FILE_SIZE_ERROR)
+            .thenReturn(FlatAppendFile.GET_FILE_SIZE_ERROR)
+            .thenReturn(2048L);
+        Assert.assertEquals(2048L, flatFile.getFileCorrectSize(retry));
+        Mockito.verify(retry, Mockito.times(3)).getSize();
+    }
 }

Reply via email to