This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 80c0330f75 ConfirmOffset directly takes the max offset when
allAckInSyncStateSet is false (#7657)
80c0330f75 is described below
commit 80c0330f752dcf3219cb8631f3004c1725bedf1e
Author: rongtong <[email protected]>
AuthorDate: Mon Dec 18 10:10:28 2023 +0800
ConfirmOffset directly takes the max offset when allAckInSyncStateSet is
false (#7657)
---
.../java/org/apache/rocketmq/store/CommitLog.java | 23 ++++++++++++----------
1 file changed, 13 insertions(+), 10 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 35c1d0e2d7..cc29cca5d9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -314,6 +314,7 @@ public class CommitLog implements Swappable {
/**
* When the normal exit, data recovery, all memory data have been flush
+ *
* @throws RocksDBException only in rocksdb mode
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws
RocksDBException {
@@ -636,7 +637,8 @@ public class CommitLog implements Swappable {
public long getConfirmOffset() {
if
(this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if
(this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() !=
BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
- if (((AutoSwitchHAService)
this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
+ if (((AutoSwitchHAService)
this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1
+ ||
!this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
return this.defaultMessageStore.getMaxPhyOffset();
}
// First time it will compute the confirmOffset.
@@ -1214,7 +1216,7 @@ public class CommitLog implements Swappable {
}
} catch (RocksDBException e) {
return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
- } finally {
+ } finally {
topicQueueLock.unlock(topicQueueKey);
}
@@ -1840,7 +1842,8 @@ public class CommitLog implements Swappable {
this.messageStoreConfig = messageStoreConfig;
}
- public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer
preEncodeBuffer, final MessageExtBrokerInner msgInner) {
+ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer
preEncodeBuffer,
+ final MessageExtBrokerInner msgInner) {
if (msgInner.isEncodeCompleted()) {
return null;
}
@@ -1850,10 +1853,10 @@ public class CommitLog implements Swappable {
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
final byte[] propertiesData =
- msgInner.getPropertiesString() == null ? null :
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
+ msgInner.getPropertiesString() == null ? null :
msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
boolean needAppendLastPropertySeparator = enabledAppendPropCRC &&
propertiesData != null && propertiesData.length > 0
- && propertiesData[propertiesData.length - 1] !=
MessageDecoder.PROPERTY_SEPARATOR;
+ && propertiesData[propertiesData.length - 1] !=
MessageDecoder.PROPERTY_SEPARATOR;
final int propertiesLength = (propertiesData == null ? 0 :
propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) +
crc32ReservedLength;
@@ -2312,7 +2315,7 @@ public class CommitLog implements Swappable {
return true;
}
- int pos = (int)(offset %
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+ int pos = (int) (offset %
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
int realIndex = pos / pageSize / sampleSteps;
return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
}
@@ -2356,8 +2359,8 @@ public class CommitLog implements Swappable {
private byte[] checkFileInPageCache(MappedFile mappedFile) {
long fileSize = mappedFile.getFileSize();
- final long address =
((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
- int pageNums = (int)(fileSize + this.pageSize - 1) / this.pageSize;
+ final long address = ((DirectBuffer)
mappedFile.getMappedByteBuffer()).address();
+ int pageNums = (int) (fileSize + this.pageSize - 1) /
this.pageSize;
byte[] pageCacheRst = new byte[pageNums];
int mincore = LibC.INSTANCE.mincore(new Pointer(address), new
NativeLong(fileSize), pageCacheRst);
if (mincore != 0) {
@@ -2395,7 +2398,7 @@ public class CommitLog implements Swappable {
return false;
}
try {
- ConsumeQueue consumeQueue =
(ConsumeQueue)defaultMessageStore.findConsumeQueue(topic, queueId);
+ ConsumeQueue consumeQueue = (ConsumeQueue)
defaultMessageStore.findConsumeQueue(topic, queueId);
if (null == consumeQueue) {
return false;
}
@@ -2433,7 +2436,7 @@ public class CommitLog implements Swappable {
log.error("setFileReadMode mappedFile is null");
return -1;
}
- final long address =
((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
+ final long address = ((DirectBuffer)
mappedFile.getMappedByteBuffer()).address();
int madvise = LibC.INSTANCE.madvise(new Pointer(address), new
NativeLong(mappedFile.getFileSize()), mode);
if (madvise != 0) {
log.error("setFileReadMode error fileName: {}, madvise: {},
mode:{}", mappedFile.getFileName(), madvise, mode);