This is an automated email from the ASF dual-hosted git repository.
lollipopjin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 91cb333716 [ISSUE #10462] Improve error handling in tiered storage
(#10473)
91cb333716 is described below
commit 91cb333716018d477782754e997bf2232327c97a
Author: lizhimins <[email protected]>
AuthorDate: Fri Jun 12 15:45:06 2026 +0800
[ISSUE #10462] Improve error handling in tiered storage (#10473)
---
.../rocketmq/tieredstore/TieredMessageStore.java | 8 +++++++-
.../rocketmq/tieredstore/file/FlatAppendFile.java | 5 +++--
.../rocketmq/tieredstore/file/FlatMessageFile.java | 3 +--
.../tieredstore/index/IndexStoreService.java | 4 ++++
.../rocketmq/tieredstore/provider/FileSegment.java | 11 +++++-----
.../tieredstore/provider/FileSegmentFactory.java | 8 ++++++--
.../tieredstore/provider/PosixFileSegment.java | 1 +
.../tieredstore/file/FlatAppendFileTest.java | 24 ++++++++++++++++++++++
8 files changed, 52 insertions(+), 12 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 2b7c347b3c..75f4248705 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -27,6 +27,7 @@ import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.rocketmq.common.BoundaryType;
@@ -292,7 +293,12 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
return result;
}).exceptionally(e -> {
- log.error("GetMessageAsync from tiered store failed", e);
+ Throwable cause = e instanceof CompletionException &&
e.getCause() != null ? e.getCause() : e;
+ if (cause instanceof Error) {
+ throw (Error) cause;
+ }
+ log.error("TieredMessageStore#getMessageAsync, get message
from tiered store failed, " +
+ "topic={}, queueId={}, offset={}", topic, queueId, offset,
cause);
return next.getMessage(group, topic, queueId, offset,
maxMsgNums, messageFilter);
});
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
index 8188febcc4..6380d90a49 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
@@ -38,6 +38,7 @@ public class FlatAppendFile {
protected static final Logger log =
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
public static final long GET_FILE_SIZE_ERROR = -1L;
+ public static final long GET_TIMESTAMP_ERROR = -1L;
protected final String filePath;
protected final FileSegmentType fileType;
@@ -164,12 +165,12 @@ public class FlatAppendFile {
public long getMinTimestamp() {
List<FileSegment> list = this.fileSegmentTable;
- return list.isEmpty() ? GET_FILE_SIZE_ERROR :
list.get(0).getMinTimestamp();
+ return list.isEmpty() ? GET_TIMESTAMP_ERROR :
list.get(0).getMinTimestamp();
}
public long getMaxTimestamp() {
List<FileSegment> list = this.fileSegmentTable;
- return list.isEmpty() ? GET_FILE_SIZE_ERROR : list.get(list.size() -
1).getMaxTimestamp();
+ return list.isEmpty() ? GET_TIMESTAMP_ERROR : list.get(list.size() -
1).getMaxTimestamp();
}
public FileSegment rollingNewFile(long offset) {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 413c6af65e..35084dd2e5 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -231,8 +231,7 @@ public class FlatMessageFile implements FlatFileInterface {
@Override
public CompletableFuture<Boolean> commitAsync() {
- // acquire lock
- if (commitLock.drainPermits() <= 0) {
+ if (!commitLock.tryAcquire()) {
return CompletableFuture.completedFuture(false);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 0a768e3d1d..74f2b94afa 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -327,6 +327,10 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
boolean result = flatAppendFile.commitAsync().join();
List<FileSegment> fileSegmentList =
flatAppendFile.getFileSegmentList();
+ if (fileSegmentList.isEmpty()) {
+ log.warn("IndexStoreService#doCompactThenUploadFile,
fileSegmentList empty, timestamp={}", indexFile.getTimestamp());
+ return false;
+ }
FileSegment fileSegment = fileSegmentList.get(fileSegmentList.size() -
1);
if (!result || fileSegment == null || fileSegment.getMinTimestamp() !=
indexFile.getTimestamp()) {
log.warn("IndexStoreService#doCompactThenUploadFile, upload
compacted file error, timestamp={}", indexFile.getTimestamp());
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
index 7235bc1083..0d4e39b74f 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
@@ -335,25 +335,26 @@ public abstract class FileSegment implements
Comparable<FileSegment>, FileSegmen
public CompletableFuture<ByteBuffer> readAsync(long position, int length) {
CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
- if (position < 0 || position >= commitPosition) {
+ long currentCommitPosition = commitPosition;
+ if (position < 0 || position >= currentCommitPosition) {
future.completeExceptionally(new
TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM,
String.format("FileSegment read position illegal, filePath=%s,
fileType=%s, position=%d, length=%d, commit=%d",
- filePath, fileType, position, length, commitPosition)));
+ filePath, fileType, position, length,
currentCommitPosition)));
return future;
}
if (length <= 0) {
future.completeExceptionally(new
TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM,
String.format("FileSegment read length illegal, filePath=%s,
fileType=%s, position=%d, length=%d, commit=%d",
- filePath, fileType, position, length, commitPosition)));
+ filePath, fileType, position, length,
currentCommitPosition)));
return future;
}
- int readableBytes = (int) (commitPosition - position);
+ int readableBytes = (int) (currentCommitPosition - position);
if (readableBytes < length) {
log.debug("FileSegment#readAsync, request position exceeds commit
position, " +
"file={}, requestPosition={}, commitPosition={},
changeLength={} to {}",
- getPath(), position, commitPosition, length, readableBytes);
+ getPath(), position, currentCommitPosition, length,
readableBytes);
length = readableBytes;
}
return this.read0(position, length);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentFactory.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentFactory.java
index ace6d8f08f..e4ace510ea 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentFactory.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentFactory.java
@@ -42,7 +42,9 @@ public class FileSegmentFactory {
fileSegmentConstructor = clazz.getConstructor(
MessageStoreConfig.class, FileSegmentType.class, String.class,
Long.TYPE, MessageStoreExecutor.class);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(String.format(
+ "FileSegmentFactory#init, failed to load FileSegment provider:
%s",
+ storeConfig.getTieredBackendServiceProvider()), e);
}
}
@@ -58,7 +60,9 @@ public class FileSegmentFactory {
try {
return fileSegmentConstructor.newInstance(this.storeConfig,
fileType, filePath, baseOffset, executor);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(String.format(
+ "FileSegmentFactory#createSegment, failed to create
FileSegment: type=%s, path=%s, baseOffset=%d",
+ fileType, filePath, baseOffset), e);
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index 70dd4f081b..63196135f6 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -202,6 +202,7 @@ public class PosixFileSegment extends FileSegment {
long costTime =
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
attributesBuilder.put(LABEL_SUCCESS, false);
TieredStoreMetricsManager.providerRpcLatency.record(costTime,
attributesBuilder.build());
+ throw new RuntimeException(e);
}
return byteBuffer;
}, executor.getBufferFetchExecutor());
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
index ed223fcf9a..b3df4e8aec 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
@@ -37,6 +37,9 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.when;
public class FlatAppendFileTest {
@@ -216,4 +219,25 @@ public class FlatAppendFileTest {
flatFile.destroy();
Assert.assertEquals(0, flatFile.fileSegmentTable.size());
}
+
+ @Test
+ public void getFileCorrectSizeTest() {
+ String filePath = MessageStoreUtil.toFilePath(queue);
+ FlatAppendFile flatFile =
flatFileFactory.createFlatFileForConsumeQueue(filePath);
+
+ // first try succeeds
+ FileSegment success = Mockito.mock(FileSegment.class);
+ when(success.getSize()).thenReturn(1024L);
+ Assert.assertEquals(1024L, flatFile.getFileCorrectSize(success));
+ Mockito.verify(success, Mockito.times(1)).getSize();
+
+ // retry then succeed
+ FileSegment retry = Mockito.mock(FileSegment.class);
+ when(retry.getSize())
+ .thenReturn(FlatAppendFile.GET_FILE_SIZE_ERROR)
+ .thenReturn(FlatAppendFile.GET_FILE_SIZE_ERROR)
+ .thenReturn(2048L);
+ Assert.assertEquals(2048L, flatFile.getFileCorrectSize(retry));
+ Mockito.verify(retry, Mockito.times(3)).getSize();
+ }
}