saharaheart edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825308210
First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be
seen that the message appending operation in point A(I annotated in the
following code), and the disk flushing operation and ha waiting operation in
point B.
However, the reputMessageService
(org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did
the reput operation in another thread. I mean if this service is fast enough,
some of the message appended after pointA may already been push to the
consumerQueue, which could be seen by the consumer. However, those messages may
have not flushed or replicated to slaves. If something broke there, could that
be possible the message is consumed but not recored in the broker?
`public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//.............
**//The following do the appendmessage operation with the mappedfile
(point A)**
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store
config
try {
long beginLockTimestamp =
this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an
orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); //
Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " +
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg,
this.appendMessageCallback);
..............
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={},
bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length,
result);
}
if (null != unlockMappedFile &&
this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new
PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
**//Here it actually do the disk flush and ha handling and waiting
(pointB)**
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]