This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c78061bf6c [ISSUE#7280] Fix and refactor handle commit exception in
tiered storage (#7281)
c78061bf6c is described below
commit c78061bf6ca5f35452510ec4107c46735c51c316
Author: lizhimins <[email protected]>
AuthorDate: Wed Aug 30 22:29:51 2023 +0800
[ISSUE#7280] Fix and refactor handle commit exception in tiered storage
(#7281)
* refactor handle commit exception
* refactor handle commit exception
* fix handle commit exception
---
.../rocketmq/tieredstore/TieredDispatcher.java | 3 +-
.../rocketmq/tieredstore/TieredMessageFetcher.java | 57 ++--
.../rocketmq/tieredstore/TieredMessageStore.java | 26 +-
.../tieredstore/provider/TieredFileSegment.java | 291 ++++++++++++---------
.../tieredstore/provider/TieredStoreProvider.java | 8 +-
.../provider/posix/PosixFileSegment.java | 4 +-
.../CommitLogInputStream.java} | 30 ++-
.../FileSegmentInputStream.java} | 49 ++--
.../FileSegmentInputStreamFactory.java} | 26 +-
.../tieredstore/TieredMessageStoreTest.java | 14 +-
.../tieredstore/file/TieredFlatFileTest.java | 2 +
.../tieredstore/file/TieredIndexFileTest.java | 2 +
...Stream.java => MockFileSegmentInputStream.java} | 8 +-
.../provider/TieredFileSegmentInputStreamTest.java | 24 +-
.../provider/TieredFileSegmentTest.java | 89 ++++++-
.../provider/memory/MemoryFileSegment.java | 27 +-
.../memory/MemoryFileSegmentWithoutCheck.java | 4 +-
17 files changed, 427 insertions(+), 237 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 1746190cdb..430c2b62eb 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -318,8 +318,7 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
continue;
case FILE_CLOSED:
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
- logger.info("TieredDispatcher#dispatchFlatFile: file
has been close and destroy, " +
- "topic: {}, queueId: {}", topic, queueId);
+ logger.info("File has been closed and destroy, topic:
{}, queueId: {}", topic, queueId);
return;
default:
dispatchOffset--;
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index 9a9a3e5a5c..766ff64f6c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -273,15 +273,17 @@ public class TieredMessageFetcher implements
MessageStoreFetcher {
TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(),
attributes);
}
- // if no cached message found and there is currently an inflight
request, wait for the request to end before continuing
+ // If there are no messages in the cache and there are currently
requests being pulled.
+ // We need to wait for the request to return before continuing.
if (resultWrapperList.isEmpty() && waitInflightRequest) {
- CompletableFuture<Long> future =
flatFile.getInflightRequest(group, queueOffset, maxCount)
- .getFuture(queueOffset);
+ CompletableFuture<Long> future =
+ flatFile.getInflightRequest(group, queueOffset,
maxCount).getFuture(queueOffset);
if (!future.isDone()) {
Stopwatch stopwatch = Stopwatch.createStarted();
// to prevent starvation issues, only allow waiting for
inflight request once
return future.thenCompose(v -> {
-
LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight
request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+ LOGGER.debug("MessageFetcher#getMessageFromCacheAsync:
wait for response cost: {}ms",
+ stopwatch.elapsed(TimeUnit.MILLISECONDS));
return getMessageFromCacheAsync(flatFile, group,
queueOffset, maxCount, false);
});
}
@@ -302,7 +304,8 @@ public class TieredMessageFetcher implements
MessageStoreFetcher {
// if cache hit, result will be returned immediately and
asynchronously prefetch messages for later requests
if (!resultWrapperList.isEmpty()) {
- LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache
hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit
num: {}",
+ LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit:
" +
+ "topic: {}, queue: {}, queue offset: {}, max message num:
{}, cache hit num: {}",
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount,
resultWrapperList.size());
prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
@@ -316,8 +319,10 @@ public class TieredMessageFetcher implements
MessageStoreFetcher {
}
// if cache is miss, immediately pull messages
- LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache
miss: topic: {}, queue: {}, queue offset: {}, max message num: {}",
+ LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache
miss: " +
+ "topic: {}, queue: {}, queue offset: {}, max message num: {}",
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+
CompletableFuture<GetMessageResult> resultFuture;
synchronized (flatFile) {
int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
@@ -453,42 +458,42 @@ public class TieredMessageFetcher implements
MessageStoreFetcher {
public CompletableFuture<GetMessageResult> getMessageAsync(
String group, String topic, int queueId, long queueOffset, int
maxCount, final MessageFilter messageFilter) {
+ GetMessageResult result = new GetMessageResult();
CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new
MessageQueue(topic, brokerName, queueId));
+
if (flatFile == null) {
- GetMessageResult result = new GetMessageResult();
result.setNextBeginOffset(queueOffset);
result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE);
return CompletableFuture.completedFuture(result);
}
- GetMessageResult result = new GetMessageResult();
- long minQueueOffset = flatFile.getConsumeQueueMinOffset();
- long maxQueueOffset = flatFile.getConsumeQueueCommitOffset();
- result.setMinOffset(minQueueOffset);
- result.setMaxOffset(maxQueueOffset);
+ // Max queue offset means next message put position
+ result.setMinOffset(flatFile.getConsumeQueueMinOffset());
+ result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
+
+ // Fill result according file offset.
+ // Offset range | Result | Fix to
+ // (-oo, 0] | no message | current offset
+ // (0, min) | too small | min offset
+ // [min, max) | correct |
+ // [max, max] | overflow one | max offset
+ // (max, +oo) | overflow badly | max offset
- if (flatFile.getConsumeQueueCommitOffset() <= 0) {
+ if (result.getMaxOffset() <= 0) {
result.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
result.setNextBeginOffset(queueOffset);
return CompletableFuture.completedFuture(result);
- }
-
- // request range | result
- // (0, min) | too small
- // [min, max) | correct
- // [max, max] | overflow one
- // (max, +oo) | overflow badly
- if (queueOffset < minQueueOffset) {
+ } else if (queueOffset < result.getMinOffset()) {
result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
- result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset());
+ result.setNextBeginOffset(result.getMinOffset());
return CompletableFuture.completedFuture(result);
- } else if (queueOffset == maxQueueOffset) {
+ } else if (queueOffset == result.getMaxOffset()) {
result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
- result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
+ result.setNextBeginOffset(result.getMaxOffset());
return CompletableFuture.completedFuture(result);
- } else if (queueOffset > maxQueueOffset) {
+ } else if (queueOffset > result.getMaxOffset()) {
result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
- result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
+ result.setNextBeginOffset(result.getMaxOffset());
return CompletableFuture.completedFuture(result);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 5240ac8e9e..78e855f365 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -99,11 +99,11 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
return storeConfig;
}
- public boolean viaTieredStorage(String topic, int queueId, long offset) {
- return viaTieredStorage(topic, queueId, offset, 1);
+ public boolean fetchFromCurrentStore(String topic, int queueId, long
offset) {
+ return fetchFromCurrentStore(topic, queueId, offset, 1);
}
- public boolean viaTieredStorage(String topic, int queueId, long offset,
int batchSize) {
+ public boolean fetchFromCurrentStore(String topic, int queueId, long
offset, int batchSize) {
TieredMessageStoreConfig.TieredStorageLevel deepStorageLevel =
storeConfig.getTieredStorageLevel();
if
(deepStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.FORCE)) {
@@ -146,8 +146,10 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
public CompletableFuture<GetMessageResult> getMessageAsync(String group,
String topic,
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter)
{
- if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
- logger.trace("GetMessageAsync from next store topic: {}, queue:
{}, offset: {}", topic, queueId, offset);
+ if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
+ logger.trace("GetMessageAsync from current store, topic: {},
queue: {}, offset: {}", topic, queueId, offset);
+ } else {
+ logger.trace("GetMessageAsync from next store, topic: {}, queue:
{}, offset: {}", topic, queueId, offset);
return next.getMessageAsync(group, topic, queueId, offset,
maxMsgNums, messageFilter);
}
@@ -168,14 +170,14 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
if (next.checkInStoreByConsumeOffset(topic, queueId,
offset)) {
TieredStoreMetricsManager.fallbackTotal.add(1,
latencyAttributes);
- logger.debug("GetMessageAsync not found then try back
to next store, result: {}, " +
+ logger.debug("GetMessageAsync not found, then back to
next store, result: {}, " +
"topic: {}, queue: {}, queue offset: {},
offset range: {}-{}",
result.getStatus(), topic, queueId, offset,
result.getMinOffset(), result.getMaxOffset());
return next.getMessage(group, topic, queueId, offset,
maxMsgNums, messageFilter);
}
}
- // system topic
+ // Fetch system topic data from the broker when using the
force level.
if (result.getStatus() ==
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
if (TieredStoreUtil.isSystemTopic(topic) ||
PopAckConstants.isStartWithRevivePrefix(topic)) {
return next.getMessage(group, topic, queueId, offset,
maxMsgNums, messageFilter);
@@ -198,7 +200,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
TieredStoreMetricsManager.messagesOutTotal.add(result.getMessageCount(),
messagesOutAttributes);
}
- // fix min or max offset according next store
+ // Fix min or max offset according next store at last
long minOffsetInQueue = next.getMinOffsetInQueue(topic,
queueId);
if (minOffsetInQueue >= 0 && minOffsetInQueue <
result.getMinOffset()) {
result.setMinOffset(minOffsetInQueue);
@@ -209,7 +211,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
}
return result;
}).exceptionally(e -> {
- logger.error("GetMessageAsync from tiered store failed: ", e);
+ logger.error("GetMessageAsync from tiered store failed", e);
return next.getMessage(group, topic, queueId, offset,
maxMsgNums, messageFilter);
});
}
@@ -251,7 +253,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
.build();
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
latencyAttributes);
if (time < 0) {
-
logger.debug("TieredMessageStore#getEarliestMessageTimeAsync: get earliest
message time failed, try to get earliest message time from next store: topic:
{}, queue: {}",
+ logger.debug("GetEarliestMessageTimeAsync failed, try to
get earliest message time from next store: topic: {}, queue: {}",
topic, queueId);
return finalNextEarliestMessageTime != Long.MAX_VALUE ?
finalNextEarliestMessageTime : -1;
}
@@ -262,7 +264,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
@Override
public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic,
int queueId,
long consumeQueueOffset) {
- if (viaTieredStorage(topic, queueId, consumeQueueOffset)) {
+ if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) {
Stopwatch stopwatch = Stopwatch.createStarted();
return fetcher.getMessageStoreTimeStampAsync(topic, queueId,
consumeQueueOffset)
.thenApply(time -> {
@@ -272,7 +274,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
.build();
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
latencyAttributes);
if (time == -1) {
-
logger.debug("TieredMessageStore#getMessageStoreTimeStampAsync: get message
time failed, try to get message time from next store: topic: {}, queue: {},
queue offset: {}",
+ logger.debug("GetEarliestMessageTimeAsync failed, try
to get message time from next store, topic: {}, queue: {}, queue offset: {}",
topic, queueId, consumeQueueOffset);
return next.getMessageStoreTimeStamp(topic, queueId,
consumeQueueOffset);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 5062c7d9e7..32911a6e89 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -16,14 +16,11 @@
*/
package org.apache.rocketmq.tieredstore.provider;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -35,8 +32,8 @@ import
org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
+import
org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
@@ -50,22 +47,23 @@ public abstract class TieredFileSegment implements
Comparable<TieredFileSegment>
protected final TieredMessageStoreConfig storeConfig;
private final long maxSize;
- private final ReentrantLock bufferLock;
- private final Semaphore commitLock;
+ private final ReentrantLock bufferLock = new ReentrantLock();
+ private final Semaphore commitLock = new Semaphore(1);
- private volatile boolean full;
- private volatile boolean closed;
+ private volatile boolean full = false;
+ private volatile boolean closed = false;
- private volatile long minTimestamp;
- private volatile long maxTimestamp;
- private volatile long commitPosition;
- private volatile long appendPosition;
+ private volatile long minTimestamp = Long.MAX_VALUE;
+ private volatile long maxTimestamp = Long.MAX_VALUE;
+ private volatile long commitPosition = 0L;
+ private volatile long appendPosition = 0L;
// only used in commitLog
- private volatile long dispatchCommitOffset = 0;
+ private volatile long dispatchCommitOffset = 0L;
private ByteBuffer codaBuffer;
- private List<ByteBuffer> uploadBufferList = new ArrayList<>();
+ private List<ByteBuffer> bufferList = new ArrayList<>();
+ private FileSegmentInputStream fileSegmentInputStream;
private CompletableFuture<Boolean> flightCommitRequest =
CompletableFuture.completedFuture(false);
public TieredFileSegment(TieredMessageStoreConfig storeConfig,
@@ -75,21 +73,13 @@ public abstract class TieredFileSegment implements
Comparable<TieredFileSegment>
this.fileType = fileType;
this.filePath = filePath;
this.baseOffset = baseOffset;
-
- this.closed = false;
- this.bufferLock = new ReentrantLock();
- this.commitLock = new Semaphore(1);
-
- this.commitPosition = 0L;
- this.appendPosition = 0L;
- this.minTimestamp = Long.MAX_VALUE;
- this.maxTimestamp = Long.MAX_VALUE;
-
- // The max segment size of a file is determined by the file type
- this.maxSize = getMaxSizeAccordingFileType(storeConfig);
+ this.maxSize = getMaxSizeByFileType();
}
- private long getMaxSizeAccordingFileType(TieredMessageStoreConfig
storeConfig) {
+ /**
+ * The max segment size of a file is determined by the file type
+ */
+ protected long getMaxSizeByFileType() {
switch (fileType) {
case COMMIT_LOG:
return storeConfig.getTieredStoreCommitLogMaxSize();
@@ -184,39 +174,23 @@ public abstract class TieredFileSegment implements
Comparable<TieredFileSegment>
this.appendPosition = pos;
}
- private List<ByteBuffer> rollingUploadBuffer() {
+ private List<ByteBuffer> borrowBuffer() {
bufferLock.lock();
try {
- List<ByteBuffer> tmp = uploadBufferList;
- uploadBufferList = new ArrayList<>();
+ List<ByteBuffer> tmp = bufferList;
+ bufferList = new ArrayList<>();
return tmp;
} finally {
bufferLock.unlock();
}
}
- private void sendBackBuffer(TieredFileSegmentInputStream inputStream) {
- bufferLock.lock();
- try {
- List<ByteBuffer> tmpBufferList = inputStream.getUploadBufferList();
- for (ByteBuffer buffer : tmpBufferList) {
- buffer.rewind();
- }
- tmpBufferList.addAll(uploadBufferList);
- uploadBufferList = tmpBufferList;
- if (inputStream.getCodaBuffer() != null) {
- codaBuffer.rewind();
- }
- } finally {
- bufferLock.unlock();
- }
- }
-
@SuppressWarnings("NonAtomicOperationOnVolatileField")
- public AppendResult append(ByteBuffer byteBuf, long timeStamp) {
+ public AppendResult append(ByteBuffer byteBuf, long timestamp) {
if (closed) {
return AppendResult.FILE_CLOSED;
}
+
bufferLock.lock();
try {
if (full || codaBuffer != null) {
@@ -227,7 +201,8 @@ public abstract class TieredFileSegment implements
Comparable<TieredFileSegment>
minTimestamp =
byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
maxTimestamp =
byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
appendPosition += byteBuf.remaining();
- uploadBufferList.add(byteBuf);
+ // IndexFile is large and not change after compaction, no need
deep copy
+ bufferList.add(byteBuf);
setFull();
return AppendResult.SUCCESS;
}
@@ -236,23 +211,34 @@ public abstract class TieredFileSegment implements
Comparable<TieredFileSegment>
setFull();
return AppendResult.FILE_FULL;
}
- if (uploadBufferList.size() >
storeConfig.getTieredStoreGroupCommitCount()
+
+ if (bufferList.size() >
storeConfig.getTieredStoreGroupCommitCount()
|| appendPosition - commitPosition >
storeConfig.getTieredStoreGroupCommitSize()) {
commitAsync();
}
- if (uploadBufferList.size() >
storeConfig.getTieredStoreMaxGroupCommitCount()) {
- logger.debug("TieredFileSegment#append: buffer full: file: {},
upload buffer size: {}",
- getPath(), uploadBufferList.size());
+
+ if (bufferList.size() >
storeConfig.getTieredStoreMaxGroupCommitCount()) {
+ logger.debug("File segment append buffer full, file: {},
buffer size: {}, pending bytes: {}",
+ getPath(), bufferList.size(), appendPosition -
commitPosition);
return AppendResult.BUFFER_FULL;
}
- if (timeStamp != Long.MAX_VALUE) {
- maxTimestamp = timeStamp;
+
+ if (timestamp != Long.MAX_VALUE) {
+ maxTimestamp = timestamp;
if (minTimestamp == Long.MAX_VALUE) {
- minTimestamp = timeStamp;
+ minTimestamp = timestamp;
}
}
+
appendPosition += byteBuf.remaining();
- uploadBufferList.add(byteBuf);
+
+ // deep copy buffer
+ ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(byteBuf.remaining());
+ byteBuffer.put(byteBuf);
+ byteBuffer.flip();
+ byteBuf.rewind();
+
+ bufferList.add(byteBuffer);
return AppendResult.SUCCESS;
} finally {
bufferLock.unlock();
@@ -267,7 +253,6 @@ public abstract class TieredFileSegment implements
Comparable<TieredFileSegment>
return appendPosition;
}
- @VisibleForTesting
public void setAppendPosition(long appendPosition) {
this.appendPosition = appendPosition;
}
@@ -333,6 +318,8 @@ public abstract class TieredFileSegment implements
Comparable<TieredFileSegment>
if (closed) {
return false;
}
+ // result is false when we send real commit request
+ // use join for wait flight request done
Boolean result = commitAsync().join();
if (!result) {
result = flightCommitRequest.join();
@@ -340,92 +327,156 @@ public abstract class TieredFileSegment implements
Comparable<TieredFileSegment>
return result;
}
+ private void releaseCommitLock() {
+ if (commitLock.availablePermits() == 0) {
+ commitLock.release();
+ } else {
+ logger.error("[Bug] FileSegmentCommitAsync, lock is already
released: available permits: {}",
+ commitLock.availablePermits());
+ }
+ }
+
+ private void updateDispatchCommitOffset(List<ByteBuffer> bufferList) {
+ if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
+ dispatchCommitOffset =
+
MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
+ }
+ }
+
+ /**
+ * @return false: commit, true: no commit operation
+ */
@SuppressWarnings("NonAtomicOperationOnVolatileField")
public CompletableFuture<Boolean> commitAsync() {
if (closed) {
return CompletableFuture.completedFuture(false);
}
- Stopwatch stopwatch = Stopwatch.createStarted();
+
if (!needCommit()) {
return CompletableFuture.completedFuture(true);
}
- try {
- int permits = commitLock.drainPermits();
- if (permits <= 0) {
- return CompletableFuture.completedFuture(false);
- }
- } catch (Exception e) {
+
+ if (commitLock.drainPermits() <= 0) {
return CompletableFuture.completedFuture(false);
}
- List<ByteBuffer> bufferList = rollingUploadBuffer();
- int bufferSize = 0;
- for (ByteBuffer buffer : bufferList) {
- bufferSize += buffer.remaining();
- }
- if (codaBuffer != null) {
- bufferSize += codaBuffer.remaining();
- }
- if (bufferSize == 0) {
- return CompletableFuture.completedFuture(true);
- }
- TieredFileSegmentInputStream inputStream =
TieredFileSegmentInputStreamFactory.build(
- fileType, baseOffset + commitPosition, bufferList, codaBuffer,
bufferSize);
- int finalBufferSize = bufferSize;
+
try {
- flightCommitRequest = commit0(inputStream, commitPosition,
bufferSize, fileType != FileSegmentType.INDEX)
+ if (fileSegmentInputStream != null) {
+ long fileSize = this.getSize();
+ if (fileSize == -1L) {
+ logger.error("Get commit position error before commit,
Commit: %d, Expect: %d, Current Max: %d, FileName: %s",
+ commitPosition, commitPosition +
fileSegmentInputStream.getContentLength(), appendPosition, getPath());
+ releaseCommitLock();
+ return CompletableFuture.completedFuture(false);
+ } else {
+ if (correctPosition(fileSize, null)) {
+
updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+ fileSegmentInputStream = null;
+ }
+ }
+ }
+
+ int bufferSize;
+ if (fileSegmentInputStream != null) {
+ bufferSize = fileSegmentInputStream.available();
+ } else {
+ List<ByteBuffer> bufferList = borrowBuffer();
+ bufferSize =
bufferList.stream().mapToInt(ByteBuffer::remaining).sum()
+ + (codaBuffer != null ? codaBuffer.remaining() : 0);
+ if (bufferSize == 0) {
+ releaseCommitLock();
+ return CompletableFuture.completedFuture(true);
+ }
+ fileSegmentInputStream = FileSegmentInputStreamFactory.build(
+ fileType, baseOffset + commitPosition, bufferList,
codaBuffer, bufferSize);
+ }
+
+ return flightCommitRequest = this
+ .commit0(fileSegmentInputStream, commitPosition, bufferSize,
fileType != FileSegmentType.INDEX)
.thenApply(result -> {
if (result) {
- if (fileType == FileSegmentType.COMMIT_LOG &&
bufferList.size() > 0) {
- dispatchCommitOffset =
MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
- }
- commitPosition += finalBufferSize;
+
updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+ commitPosition += bufferSize;
+ fileSegmentInputStream = null;
return true;
- }
- sendBackBuffer(inputStream);
- return false;
- })
- .exceptionally(e -> handleCommitException(inputStream, e))
- .whenComplete((result, e) -> {
- if (commitLock.availablePermits() == 0) {
- logger.debug("TieredFileSegment#commitAsync: commit
cost: {}ms, file: {}, item count: {}, buffer size: {}",
stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(),
finalBufferSize);
- commitLock.release();
} else {
- logger.error("[Bug]TieredFileSegment#commitAsync:
commit lock is already released: available permits: {}",
commitLock.availablePermits());
+ fileSegmentInputStream.rewind();
+ return false;
}
- });
- return flightCommitRequest;
+ })
+ .exceptionally(this::handleCommitException)
+ .whenComplete((result, e) -> releaseCommitLock());
+
} catch (Exception e) {
- handleCommitException(inputStream, e);
- if (commitLock.availablePermits() == 0) {
- logger.debug("TieredFileSegment#commitAsync: commit cost:
{}ms, file: {}, item count: {}, buffer size: {}",
stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(),
finalBufferSize);
- commitLock.release();
- } else {
- logger.error("[Bug]TieredFileSegment#commitAsync: commit lock
is already released: available permits: {}", commitLock.availablePermits());
- }
+ handleCommitException(e);
+ releaseCommitLock();
}
return CompletableFuture.completedFuture(false);
}
- private boolean handleCommitException(TieredFileSegmentInputStream
inputStream, Throwable e) {
+ private long getCorrectFileSize(Throwable throwable) {
+ if (throwable instanceof TieredStoreException) {
+ long fileSize = ((TieredStoreException) throwable).getPosition();
+ if (fileSize > 0) {
+ return fileSize;
+ }
+ }
+ return getSize();
+ }
+
+ private boolean handleCommitException(Throwable e) {
+ // Get root cause here
Throwable cause = e.getCause() != null ? e.getCause() : e;
- sendBackBuffer(inputStream);
- long realSize = 0;
- if (cause instanceof TieredStoreException && ((TieredStoreException)
cause).getPosition() > 0) {
- realSize = ((TieredStoreException) cause).getPosition();
+ long fileSize = this.getCorrectFileSize(cause);
+
+ if (fileSize == -1L) {
+ logger.error("Get commit position error, Commit: %d, Expect: %d,
Current Max: %d, FileName: %s",
+ commitPosition, commitPosition +
fileSegmentInputStream.getContentLength(), appendPosition, getPath());
+ fileSegmentInputStream.rewind();
+ return false;
+ }
+
+ if (correctPosition(fileSize, cause)) {
+ updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+ fileSegmentInputStream = null;
+ return true;
+ } else {
+ fileSegmentInputStream.rewind();
+ return false;
}
- if (realSize <= 0) {
- realSize = getSize();
+ }
+
+ /**
+ * return true to clear buffer
+ */
+ private boolean correctPosition(long fileSize, Throwable throwable) {
+
+ // Current we have three offsets here: commit offset, expect offset,
file size.
+ // We guarantee that the commit offset is less than or equal to the
expect offset.
+ // Max offset will increase because we can continuously put in new
buffers
+ String handleInfo = throwable == null ? "before commit" : "after
commit";
+ long expectPosition = commitPosition +
fileSegmentInputStream.getContentLength();
+
+ String offsetInfo = String.format("Correct Commit Position, %s,
result=[{}], " +
+ "Commit: %d, Expect: %d, Current Max: %d, FileSize: %d,
FileName: %s",
+ handleInfo, commitPosition, expectPosition, appendPosition,
fileSize, this.getPath());
+
+ // We are believing that the file size returned by the server is
correct,
+ // can reset the commit offset to the file size reported by the
storage system.
+ if (fileSize == expectPosition) {
+ logger.info(offsetInfo, "Success", throwable);
+ commitPosition = fileSize;
+ return true;
}
- if (realSize > 0 && realSize > commitPosition) {
- logger.error("TieredFileSegment#handleCommitException: commit
failed: file: {}, try to fix position: origin: {}, real: {}", getPath(),
commitPosition, realSize, cause);
- // TODO check if this diff part is uploaded to backend storage
- long diff = appendPosition - commitPosition;
- commitPosition = realSize;
- appendPosition = realSize + diff;
- // TODO check if appendPosition is large than maxOffset
- } else if (realSize < commitPosition) {
- logger.error("[Bug]TieredFileSegment#handleCommitException: commit
failed: file: {}, can not fix position: origin: {}, real: {}", getPath(),
commitPosition, realSize, cause);
+
+ if (fileSize < commitPosition) {
+ logger.error(offsetInfo, "FileSizeIncorrect", throwable);
+ } else if (fileSize == commitPosition) {
+ logger.warn(offsetInfo, "CommitFailed", throwable);
+ } else if (fileSize > commitPosition) {
+ logger.warn(offsetInfo, "PartialSuccess", throwable);
}
+ commitPosition = fileSize;
return false;
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index 5a0ca25f59..0db3eaf8f4 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.tieredstore.provider;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
public interface TieredStoreProvider {
@@ -30,7 +30,9 @@ public interface TieredStoreProvider {
String getPath();
/**
- * Get file size in backend file system
+ * Get the real length of the file.
+ * Return 0 if the file does not exist,
+ * Return -1 if system get size failed.
*
* @return file real size
*/
@@ -71,5 +73,5 @@ public interface TieredStoreProvider {
* @param append try to append or create a new file
* @return put result, <code>true</code> if data successfully write;
<code>false</code> otherwise
*/
- CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream
inputStream,long position, int length, boolean append);
+ CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream,long
position, int length, boolean append);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 52be90b1df..7e949cb28c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -36,7 +36,7 @@ import
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import static
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
@@ -184,7 +184,7 @@ public class PosixFileSegment extends TieredFileSegment {
@Override
public CompletableFuture<Boolean> commit0(
- TieredFileSegmentInputStream inputStream, long position, int length,
boolean append) {
+ FileSegmentInputStream inputStream, long position, int length, boolean
append) {
Stopwatch stopwatch = Stopwatch.createStarted();
AttributesBuilder attributesBuilder = newAttributesBuilder()
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
similarity index 88%
rename from
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
rename to
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
index c70bb76562..13b6e0ef9c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -23,20 +23,23 @@ import java.util.List;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
-public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
+public class CommitLogInputStream extends FileSegmentInputStream {
/**
* commitLogOffset is the real physical offset of the commitLog buffer
which is being read
*/
+ private final long startCommitLogOffset;
+
private long commitLogOffset;
private final ByteBuffer codaBuffer;
private long markCommitLogOffset = -1;
- public TieredCommitLogInputStream(FileSegmentType fileType, long
startOffset,
+ public CommitLogInputStream(FileSegmentType fileType, long startOffset,
List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int
contentLength) {
super(fileType, uploadBufferList, contentLength);
+ this.startCommitLogOffset = startOffset;
this.commitLogOffset = startOffset;
this.codaBuffer = codaBuffer;
}
@@ -53,6 +56,15 @@ public class TieredCommitLogInputStream extends
TieredFileSegmentInputStream {
this.commitLogOffset = markCommitLogOffset;
}
+ @Override
+ public synchronized void rewind() {
+ super.rewind();
+ this.commitLogOffset = this.startCommitLogOffset;
+ if (this.codaBuffer != null) {
+ this.codaBuffer.rewind();
+ }
+ }
+
@Override
public ByteBuffer getCodaBuffer() {
return this.codaBuffer;
@@ -64,17 +76,17 @@ public class TieredCommitLogInputStream extends
TieredFileSegmentInputStream {
return -1;
}
readPosition++;
- if (curReadBufferIndex >= uploadBufferList.size()) {
+ if (curReadBufferIndex >= bufferList.size()) {
return readCoda();
}
int res;
if (readPosInCurBuffer >= curBuffer.remaining()) {
curReadBufferIndex++;
- if (curReadBufferIndex >= uploadBufferList.size()) {
+ if (curReadBufferIndex >= bufferList.size()) {
readPosInCurBuffer = 0;
return readCoda();
}
- curBuffer = uploadBufferList.get(curReadBufferIndex);
+ curBuffer = bufferList.get(curReadBufferIndex);
commitLogOffset += readPosInCurBuffer;
readPosInCurBuffer = 0;
}
@@ -119,9 +131,9 @@ public class TieredCommitLogInputStream extends
TieredFileSegmentInputStream {
int posInCurBuffer = readPosInCurBuffer;
long curCommitLogOffset = commitLogOffset;
ByteBuffer curBuf = curBuffer;
- while (needRead > 0 && bufIndex <= uploadBufferList.size()) {
+ while (needRead > 0 && bufIndex <= bufferList.size()) {
int readLen, remaining, realReadLen = 0;
- if (bufIndex == uploadBufferList.size()) {
+ if (bufIndex == bufferList.size()) {
// read from coda buffer
remaining = codaBuffer.remaining() - posInCurBuffer;
readLen = Math.min(remaining, needRead);
@@ -137,7 +149,7 @@ public class TieredCommitLogInputStream extends
TieredFileSegmentInputStream {
}
remaining = curBuf.remaining() - posInCurBuffer;
readLen = Math.min(remaining, needRead);
- curBuf = uploadBufferList.get(bufIndex);
+ curBuf = bufferList.get(bufIndex);
if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) {
realReadLen =
Math.min(MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen);
// read from commitLog buffer
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
similarity index 77%
rename from
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
rename to
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
index e1758ca934..9e9d5135cd 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
-public class TieredFileSegmentInputStream extends InputStream {
+public class FileSegmentInputStream extends InputStream {
/**
* file type, can be commitlog, consume queue or indexfile now
@@ -33,7 +34,7 @@ public class TieredFileSegmentInputStream extends InputStream
{
/**
* hold bytebuffer
*/
- protected final List<ByteBuffer> uploadBufferList;
+ protected final List<ByteBuffer> bufferList;
/**
* total remaining of bytebuffer list
@@ -65,13 +66,13 @@ public class TieredFileSegmentInputStream extends
InputStream {
private int markReadPosInCurBuffer = -1;
- public TieredFileSegmentInputStream(FileSegmentType fileType,
List<ByteBuffer> uploadBufferList,
- int contentLength) {
+ public FileSegmentInputStream(
+ FileSegmentType fileType, List<ByteBuffer> bufferList, int
contentLength) {
this.fileType = fileType;
this.contentLength = contentLength;
- this.uploadBufferList = uploadBufferList;
- if (uploadBufferList != null && uploadBufferList.size() > 0) {
- this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+ this.bufferList = bufferList;
+ if (bufferList != null && bufferList.size() > 0) {
+ this.curBuffer = bufferList.get(curReadBufferIndex);
}
}
@@ -95,18 +96,34 @@ public class TieredFileSegmentInputStream extends
InputStream {
this.readPosition = markReadPosition;
this.curReadBufferIndex = markCurReadBufferIndex;
this.readPosInCurBuffer = markReadPosInCurBuffer;
- if (this.curReadBufferIndex < uploadBufferList.size()) {
- this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+ if (this.curReadBufferIndex < bufferList.size()) {
+ this.curBuffer = bufferList.get(curReadBufferIndex);
}
}
+ public synchronized void rewind() {
+ this.readPosition = 0;
+ this.curReadBufferIndex = 0;
+ this.readPosInCurBuffer = 0;
+ if (CollectionUtils.isNotEmpty(bufferList)) {
+ this.curBuffer = bufferList.get(0);
+ for (ByteBuffer buffer : bufferList) {
+ buffer.rewind();
+ }
+ }
+ }
+
+ public int getContentLength() {
+ return contentLength;
+ }
+
@Override
public int available() {
return contentLength - readPosition;
}
- public List<ByteBuffer> getUploadBufferList() {
- return uploadBufferList;
+ public List<ByteBuffer> getBufferList() {
+ return bufferList;
}
public ByteBuffer getCodaBuffer() {
@@ -121,10 +138,10 @@ public class TieredFileSegmentInputStream extends
InputStream {
readPosition++;
if (readPosInCurBuffer >= curBuffer.remaining()) {
curReadBufferIndex++;
- if (curReadBufferIndex >= uploadBufferList.size()) {
+ if (curReadBufferIndex >= bufferList.size()) {
return -1;
}
- curBuffer = uploadBufferList.get(curReadBufferIndex);
+ curBuffer = bufferList.get(curReadBufferIndex);
readPosInCurBuffer = 0;
}
return curBuffer.get(readPosInCurBuffer++) & 0xff;
@@ -153,8 +170,8 @@ public class TieredFileSegmentInputStream extends
InputStream {
int bufIndex = curReadBufferIndex;
int posInCurBuffer = readPosInCurBuffer;
ByteBuffer curBuf = curBuffer;
- while (needRead > 0 && bufIndex < uploadBufferList.size()) {
- curBuf = uploadBufferList.get(bufIndex);
+ while (needRead > 0 && bufIndex < bufferList.size()) {
+ curBuf = bufferList.get(bufIndex);
int remaining = curBuf.remaining() - posInCurBuffer;
int readLen = Math.min(remaining, needRead);
// read from curBuf
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
similarity index 54%
rename from
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
rename to
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
index d0c983fd43..a90baff3ae 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
@@ -15,30 +15,34 @@
* limitations under the License.
*/
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
-public class TieredFileSegmentInputStreamFactory {
+public class FileSegmentInputStreamFactory {
- public static TieredFileSegmentInputStream build(FileSegmentType fileType,
- long startOffset, List<ByteBuffer> uploadBufferList, ByteBuffer
codaBuffer, int contentLength) {
+ public static FileSegmentInputStream build(
+ FileSegmentType fileType, long offset, List<ByteBuffer> bufferList,
ByteBuffer byteBuffer, int length) {
+
+ if (bufferList == null) {
+ throw new IllegalArgumentException("bufferList is null");
+ }
switch (fileType) {
case COMMIT_LOG:
- return new TieredCommitLogInputStream(
- fileType, startOffset, uploadBufferList, codaBuffer,
contentLength);
+ return new CommitLogInputStream(
+ fileType, offset, bufferList, byteBuffer, length);
case CONSUME_QUEUE:
- return new TieredFileSegmentInputStream(fileType,
uploadBufferList, contentLength);
+ return new FileSegmentInputStream(fileType, bufferList,
length);
case INDEX:
- if (uploadBufferList.size() != 1) {
- throw new IllegalArgumentException("uploadBufferList size
in INDEX type input stream must be 1");
+ if (bufferList.size() != 1) {
+ throw new IllegalArgumentException("buffer block size must
be 1 when file type is IndexFile");
}
- return new TieredFileSegmentInputStream(fileType,
uploadBufferList, contentLength);
+ return new FileSegmentInputStream(fileType, bufferList,
length);
default:
- throw new IllegalArgumentException("fileType is not
supported");
+ throw new IllegalArgumentException("file type is not
supported");
}
}
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 8601392e74..2451199c28 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -130,36 +130,36 @@ public class TieredMessageStoreTest {
// TieredStorageLevel.DISABLE
properties.setProperty("tieredStorageLevel", "0");
configuration.update(properties);
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(),
mq.getQueueId(), 0));
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(),
mq.getQueueId(), 0));
// TieredStorageLevel.NOT_IN_DISK
properties.setProperty("tieredStorageLevel", "1");
configuration.update(properties);
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(),
anyLong())).thenReturn(false);
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(),
mq.getQueueId(), 0));
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(),
mq.getQueueId(), 0));
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(),
anyLong())).thenReturn(true);
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(),
mq.getQueueId(), 0));
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(),
mq.getQueueId(), 0));
// TieredStorageLevel.NOT_IN_MEM
properties.setProperty("tieredStorageLevel", "2");
configuration.update(properties);
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(),
anyInt(), anyLong())).thenReturn(false);
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(),
anyInt(), anyLong(), anyInt())).thenReturn(true);
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(),
mq.getQueueId(), 0));
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(),
mq.getQueueId(), 0));
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(),
anyInt(), anyLong())).thenReturn(true);
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(),
anyInt(), anyLong(), anyInt())).thenReturn(false);
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(),
mq.getQueueId(), 0));
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(),
mq.getQueueId(), 0));
Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(),
anyInt(), anyLong())).thenReturn(true);
Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(),
anyInt(), anyLong(), anyInt())).thenReturn(true);
- Assert.assertFalse(store.viaTieredStorage(mq.getTopic(),
mq.getQueueId(), 0));
+ Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(),
mq.getQueueId(), 0));
// TieredStorageLevel.FORCE
properties.setProperty("tieredStorageLevel", "3");
configuration.update(properties);
- Assert.assertTrue(store.viaTieredStorage(mq.getTopic(),
mq.getQueueId(), 0));
+ Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(),
mq.getQueueId(), 0));
}
@Test
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
index cc39cfbfce..7a4d059690 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
@@ -55,6 +56,7 @@ public class TieredFlatFileTest {
public void tearDown() throws IOException {
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyTempDir(storePath);
+ TieredStoreExecutor.shutdown();
}
private List<FileSegmentMetadata>
getSegmentMetadataList(TieredMetadataStore metadataStore) {
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
index 262d6645b3..2da72bc7a7 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
@@ -87,5 +87,7 @@ public class TieredIndexFileTest {
indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000,
1200).join();
Assert.assertEquals(1, indexList.size());
+
+ indexFile.destroy();
}
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
similarity index 82%
rename from
tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
rename to
tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
index a6566b7de5..3bbe41dd4b 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
@@ -20,13 +20,13 @@ package org.apache.rocketmq.tieredstore.provider;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
-public class MockTieredFileSegmentInputStream extends
TieredFileSegmentInputStream {
+public class MockFileSegmentInputStream extends FileSegmentInputStream {
private final InputStream inputStream;
- public MockTieredFileSegmentInputStream(InputStream inputStream) {
+ public MockFileSegmentInputStream(InputStream inputStream) {
super(null, null, Integer.MAX_VALUE);
this.inputStream = inputStream;
}
@@ -43,7 +43,7 @@ public class MockTieredFileSegmentInputStream extends
TieredFileSegmentInputStre
}
@Override
- public List<ByteBuffer> getUploadBufferList() {
+ public List<ByteBuffer> getBufferList() {
return null;
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
index a2554ba3d1..743d9182ce 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
@@ -28,8 +28,8 @@ import java.util.Random;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
+import
org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.junit.Assert;
@@ -57,7 +57,7 @@ public class TieredFileSegmentInputStreamTest {
bufferSize += byteBuffer.remaining();
}
- // build expected byte buffer for verifying the
TieredFileSegmentInputStream
+ // build expected byte buffer for verifying the FileSegmentInputStream
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
for (ByteBuffer byteBuffer : uploadBufferList) {
expectedByteBuffer.put(byteBuffer);
@@ -74,7 +74,7 @@ public class TieredFileSegmentInputStreamTest {
int[] batchReadSizeTestSet = {
MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1,
MessageBufferUtil.PHYSICAL_OFFSET_POSITION,
MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN +
1
};
- verifyReadAndReset(expectedByteBuffer, () ->
TieredFileSegmentInputStreamFactory.build(
+ verifyReadAndReset(expectedByteBuffer, () ->
FileSegmentInputStreamFactory.build(
FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET,
uploadBufferList, null, finalBufferSize), finalBufferSize,
batchReadSizeTestSet);
}
@@ -98,7 +98,7 @@ public class TieredFileSegmentInputStreamTest {
int codaBufferSize = codaBuffer.remaining();
bufferSize += codaBufferSize;
- // build expected byte buffer for verifying the
TieredFileSegmentInputStream
+ // build expected byte buffer for verifying the FileSegmentInputStream
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
for (ByteBuffer byteBuffer : uploadBufferList) {
expectedByteBuffer.put(byteBuffer);
@@ -119,7 +119,7 @@ public class TieredFileSegmentInputStreamTest {
MSG_LEN - 1, MSG_LEN, MSG_LEN + 1,
bufferSize - 1, bufferSize, bufferSize + 1
};
- verifyReadAndReset(expectedByteBuffer, () ->
TieredFileSegmentInputStreamFactory.build(
+ verifyReadAndReset(expectedByteBuffer, () ->
FileSegmentInputStreamFactory.build(
FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET,
uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize,
batchReadSizeTestSet);
}
@@ -134,7 +134,7 @@ public class TieredFileSegmentInputStreamTest {
bufferSize += byteBuffer.remaining();
}
- // build expected byte buffer for verifying the
TieredFileSegmentInputStream
+ // build expected byte buffer for verifying the FileSegmentInputStream
ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
for (ByteBuffer byteBuffer : uploadBufferList) {
expectedByteBuffer.put(byteBuffer);
@@ -143,7 +143,7 @@ public class TieredFileSegmentInputStreamTest {
int finalBufferSize = bufferSize;
int[] batchReadSizeTestSet =
{TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1,
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE,
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1};
- verifyReadAndReset(expectedByteBuffer, () ->
TieredFileSegmentInputStreamFactory.build(
+ verifyReadAndReset(expectedByteBuffer, () ->
FileSegmentInputStreamFactory.build(
FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET,
uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet);
}
@@ -156,16 +156,16 @@ public class TieredFileSegmentInputStreamTest {
byteBuffer.flip();
List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer);
- // build expected byte buffer for verifying the
TieredFileSegmentInputStream
+ // build expected byte buffer for verifying the FileSegmentInputStream
ByteBuffer expectedByteBuffer = byteBuffer.slice();
- verifyReadAndReset(expectedByteBuffer, () ->
TieredFileSegmentInputStreamFactory.build(
+ verifyReadAndReset(expectedByteBuffer, () ->
FileSegmentInputStreamFactory.build(
FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList,
null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25});
}
- private void verifyReadAndReset(ByteBuffer expectedByteBuffer,
Supplier<TieredFileSegmentInputStream> constructor,
+ private void verifyReadAndReset(ByteBuffer expectedByteBuffer,
Supplier<FileSegmentInputStream> constructor,
int bufferSize, int[] readBatchSizeTestSet) {
- TieredFileSegmentInputStream inputStream = constructor.get();
+ FileSegmentInputStream inputStream = constructor.get();
// verify
verifyInputStream(inputStream, expectedByteBuffer);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
index 4cd83e0d26..a655710a50 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
@@ -116,13 +116,22 @@ public class TieredFileSegmentTest {
}
@Test
- public void testCommitFailed() {
+ public void testCommitFailedThenSuccess() {
long startTime = System.currentTimeMillis();
MemoryFileSegment segment = (MemoryFileSegment)
createFileSegment(FileSegmentType.COMMIT_LOG);
long lastSize = segment.getSize();
- segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
- segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
-
+ segment.setCheckSize(false);
+ segment.initPosition(lastSize);
+ segment.setSize((int) lastSize);
+
+ ByteBuffer buffer1 =
MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize);
+ ByteBuffer buffer2 =
MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize
+ MessageBufferUtilTest.MSG_LEN);
+ segment.append(buffer1, 0);
+ segment.append(buffer2, 0);
+
+ // Mock new message arrive
segment.blocker = new CompletableFuture<>();
new Thread(() -> {
try {
@@ -131,20 +140,88 @@ public class TieredFileSegmentTest {
Assert.fail(e.getMessage());
}
ByteBuffer buffer =
MessageBufferUtilTest.buildMockedMessageBuffer();
+ buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION,
MessageBufferUtilTest.MSG_LEN * 2);
buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION,
startTime);
segment.append(buffer, 0);
segment.blocker.complete(false);
}).start();
+ // Commit failed
segment.commit();
segment.blocker.join();
+ segment.blocker = null;
+
+ // Copy data and assume commit success
+ segment.getMemStore().put(buffer1);
+ segment.getMemStore().put(buffer2);
+ segment.setSize((int) (lastSize + MessageBufferUtilTest.MSG_LEN * 2));
- segment.blocker = new CompletableFuture<>();
- segment.blocker.complete(true);
segment.commit();
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3,
segment.getCommitPosition());
+ Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
+ Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+ ByteBuffer msg1 = segment.read(lastSize,
MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize,
MessageBufferUtil.getCommitLogOffset(msg1));
+
+ ByteBuffer msg2 = segment.read(lastSize +
MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2));
+
+ ByteBuffer msg3 = segment.read(lastSize +
MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN);
+ Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3));
+ }
+
+ @Test
+ public void testCommitFailed3Times() {
+ long startTime = System.currentTimeMillis();
+ MemoryFileSegment segment = (MemoryFileSegment)
createFileSegment(FileSegmentType.COMMIT_LOG);
+ long lastSize = segment.getSize();
+ segment.setCheckSize(false);
+ segment.initPosition(lastSize);
+ segment.setSize((int) lastSize);
+
+ ByteBuffer buffer1 =
MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize);
+ ByteBuffer buffer2 =
MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+ MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize
+ MessageBufferUtilTest.MSG_LEN);
+ segment.append(buffer1, 0);
+ segment.append(buffer2, 0);
+
+ // Mock new message arrive
+ segment.blocker = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ Assert.fail(e.getMessage());
+ }
+ ByteBuffer buffer =
MessageBufferUtilTest.buildMockedMessageBuffer();
+ buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION,
MessageBufferUtilTest.MSG_LEN * 2);
+ buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION,
startTime);
+ segment.append(buffer, 0);
+ segment.blocker.complete(false);
+ }).start();
+
+ for (int i = 0; i < 3; i++) {
+ segment.commit();
+ }
+
+ Assert.assertEquals(lastSize, segment.getCommitPosition());
+ Assert.assertEquals(baseOffset + lastSize, segment.getCommitOffset());
+ Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+ segment.blocker.join();
+ segment.blocker = null;
+ segment.commit();
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 2,
segment.getCommitPosition());
+ Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitOffset());
Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+ segment.commit();
+ Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3,
segment.getCommitPosition());
Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
+ Assert.assertEquals(baseOffset + lastSize +
MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
ByteBuffer msg1 = segment.read(lastSize,
MessageBufferUtilTest.MSG_LEN);
Assert.assertEquals(baseOffset + lastSize,
MessageBufferUtil.getCommitLogOffset(msg1));
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
index cb155cf8f7..80ad41f685 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.Assert;
@@ -33,6 +33,8 @@ public class MemoryFileSegment extends TieredFileSegment {
public CompletableFuture<Boolean> blocker;
+ protected int size = 0;
+
protected boolean checkSize = true;
public MemoryFileSegment(FileSegmentType fileType, MessageQueue
messageQueue, long baseOffset,
@@ -56,6 +58,18 @@ public class MemoryFileSegment extends TieredFileSegment {
memStore.position((int) getSize());
}
+ public boolean isCheckSize() {
+ return checkSize;
+ }
+
+ public void setCheckSize(boolean checkSize) {
+ this.checkSize = checkSize;
+ }
+
+ public ByteBuffer getMemStore() {
+ return memStore;
+ }
+
@Override
public String getPath() {
return filePath;
@@ -66,7 +80,11 @@ public class MemoryFileSegment extends TieredFileSegment {
if (checkSize) {
return 1000;
}
- return 0;
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
}
@Override
@@ -85,11 +103,11 @@ public class MemoryFileSegment extends TieredFileSegment {
@Override
public CompletableFuture<Boolean> commit0(
- TieredFileSegmentInputStream inputStream, long position, int length,
boolean append) {
+ FileSegmentInputStream inputStream, long position, int length, boolean
append) {
try {
if (blocker != null && !blocker.get()) {
- throw new IllegalStateException();
+ throw new IllegalStateException("Commit Exception for Memory
Test");
}
} catch (InterruptedException | ExecutionException e) {
Assert.fail(e.getMessage());
@@ -98,7 +116,6 @@ public class MemoryFileSegment extends TieredFileSegment {
Assert.assertTrue(!checkSize || position >= getSize());
byte[] buffer = new byte[1024];
-
int startPos = memStore.position();
try {
int len;
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
index 8ac330b370..630fd22236 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
@@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.Assert;
@@ -46,7 +46,7 @@ public class MemoryFileSegmentWithoutCheck extends
MemoryFileSegment {
}
@Override
- public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream
inputStream, long position, int length,
+ public CompletableFuture<Boolean> commit0(FileSegmentInputStream
inputStream, long position, int length,
boolean append) {
try {
if (blocker != null && !blocker.get()) {