This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 33cb22e1c0 [ISSUE #7018] fix append in tiered storage when message
offset incorrect (#7019)
33cb22e1c0 is described below
commit 33cb22e1c0fa7ba980567117230fe443ff5dbd62
Author: lizhimins <[email protected]>
AuthorDate: Thu Jul 13 15:37:24 2023 +0800
[ISSUE #7018] fix append in tiered storage when message offset incorrect
(#7019)
* fix append in tiered storage when message offset incorrect
---
.../rocketmq/tieredstore/TieredDispatcher.java | 25 +++++++++++-------
.../tieredstore/file/CompositeFlatFile.java | 30 +++++++++++-----------
.../tieredstore/file/CompositeQueueFlatFile.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 4 +--
4 files changed, 34 insertions(+), 27 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 2a8e2ed71f..6584b0e89e 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -308,9 +308,18 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
doRedispatchRequestToWriteMap(
result, flatFile, dispatchOffset, newCommitLogOffset,
size, tagCode, message.getByteBuffer());
message.release();
- if (result != AppendResult.SUCCESS) {
- dispatchOffset--;
- break;
+
+ switch (result) {
+ case SUCCESS:
+ continue;
+ case FILE_CLOSED:
+
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
+ logger.info("TieredDispatcher#dispatchFlatFile: file
has been close and destroy, " +
+ "topic: {}, queueId: {}", topic, queueId);
+ return;
+ default:
+ dispatchOffset--;
+ break;
}
}
@@ -341,15 +350,13 @@ public class TieredDispatcher extends ServiceThread
implements CommitLogDispatch
switch (result) {
case SUCCESS:
- break;
- case OFFSET_INCORRECT:
long offset = MessageBufferUtil.getQueueOffset(message);
if (queueOffset != offset) {
- logger.error("[Bug] Commitlog offset incorrect, " +
- "result={}, topic={}, queueId={}, offset={}, msg
offset={}",
- result, topic, queueId, queueOffset, offset);
+ logger.error("Message cq offset in commitlog does not meet
expectations, " +
+ "result={}, topic={}, queueId={}, cq offset={},
msg offset={}",
+ AppendResult.OFFSET_INCORRECT, topic, queueId,
queueOffset, offset);
}
- return;
+ break;
case BUFFER_FULL:
logger.debug("Commitlog buffer full, result={}, topic={},
queueId={}, offset={}",
result, topic, queueId, queueOffset);
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index 1243f77213..8f8ba98b1d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -58,7 +59,7 @@ public class CompositeFlatFile implements CompositeAccess {
* dispatched to the current chunk, indicating the progress of the message
distribution.
* It's consume queue current offset.
*/
- protected volatile long dispatchOffset;
+ protected final AtomicLong dispatchOffset;
protected final ReentrantLock compositeFlatFileLock;
protected final TieredMessageStoreConfig storeConfig;
@@ -75,6 +76,7 @@ public class CompositeFlatFile implements CompositeAccess {
this.storeConfig = fileQueueFactory.getStoreConfig();
this.readAheadFactor = this.storeConfig.getReadAheadMinFactor();
this.metadataStore =
TieredStoreUtil.getMetadataStore(this.storeConfig);
+ this.dispatchOffset = new AtomicLong();
this.compositeFlatFileLock = new ReentrantLock();
this.inFlightRequestMap = new ConcurrentHashMap<>();
this.commitLog = new TieredCommitLog(fileQueueFactory, filePath);
@@ -83,8 +85,8 @@ public class CompositeFlatFile implements CompositeAccess {
}
protected void recoverMetadata() {
- if (!consumeQueue.isInitialized() && this.dispatchOffset != -1) {
- consumeQueue.setBaseOffset(this.dispatchOffset *
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+ if (!consumeQueue.isInitialized() && this.dispatchOffset.get() != -1) {
+ consumeQueue.setBaseOffset(this.dispatchOffset.get() *
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
}
}
@@ -144,7 +146,7 @@ public class CompositeFlatFile implements CompositeAccess {
}
public long getDispatchOffset() {
- return dispatchOffset;
+ return dispatchOffset.get();
}
@Override
@@ -309,7 +311,7 @@ public class CompositeFlatFile implements CompositeAccess {
if (!consumeQueue.isInitialized()) {
consumeQueue.setBaseOffset(offset *
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
}
- dispatchOffset = offset;
+ dispatchOffset.set(offset);
}
@Override
@@ -323,14 +325,9 @@ public class CompositeFlatFile implements CompositeAccess {
return AppendResult.FILE_CLOSED;
}
- long queueOffset = MessageBufferUtil.getQueueOffset(message);
- if (dispatchOffset != queueOffset) {
- return AppendResult.OFFSET_INCORRECT;
- }
-
AppendResult result = commitLog.append(message, commit);
if (result == AppendResult.SUCCESS) {
- dispatchOffset = queueOffset + 1;
+ dispatchOffset.incrementAndGet();
}
return result;
}
@@ -483,14 +480,17 @@ public class CompositeFlatFile implements CompositeAccess
{
}
public void destroy() {
- closed = true;
- commitLog.destroy();
- consumeQueue.destroy();
try {
+ closed = true;
+ compositeFlatFileLock.lock();
+ commitLog.destroy();
+ consumeQueue.destroy();
metadataStore.deleteFileSegment(filePath,
FileSegmentType.COMMIT_LOG);
metadataStore.deleteFileSegment(filePath,
FileSegmentType.CONSUME_QUEUE);
} catch (Exception e) {
- LOGGER.error("CompositeFlatFile#destroy: clean metadata failed: ",
e);
+ LOGGER.error("CompositeFlatFile#destroy: delete file failed", e);
+ } finally {
+ compositeFlatFileLock.unlock();
}
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
index c0cf79069d..f6c0afed05 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFile.java
@@ -64,7 +64,7 @@ public class CompositeQueueFlatFile extends CompositeFlatFile
{
if (queueMetadata.getMaxOffset() < queueMetadata.getMinOffset()) {
queueMetadata.setMaxOffset(queueMetadata.getMinOffset());
}
- this.dispatchOffset = queueMetadata.getMaxOffset();
+ this.dispatchOffset.set(queueMetadata.getMaxOffset());
}
public void persistMetadata() {
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 9735a535e9..8322c72edf 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -73,8 +73,8 @@ public class CompositeQueueFlatFileTest {
CompositeQueueFlatFile flatFile = new
CompositeQueueFlatFile(tieredFileAllocator, mq);
ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
AppendResult result = flatFile.appendCommitLog(message);
- Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result);
- Assert.assertEquals(0L,
flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
+ Assert.assertEquals(AppendResult.SUCCESS, result);
+ Assert.assertEquals(122L,
flatFile.commitLog.getFlatFile().getFileToWrite().getAppendPosition());
Assert.assertEquals(0L,
flatFile.commitLog.getFlatFile().getFileToWrite().getCommitPosition());
flatFile = new CompositeQueueFlatFile(tieredFileAllocator, mq);