This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bb9f106 [ISSUE #1904] Print log when flush timeout (#1903)
bb9f106 is described below
commit bb9f106e31c01e7b38f0c0f130d211da5a4f31cb
Author: rushsky518 <[email protected]>
AuthorDate: Fri Dec 4 15:43:29 2020 +0800
[ISSUE #1904] Print log when flush timeout (#1903)
* rollback my code
* avoid log when disk flush
* when msg is in next file, flushOK value may be wrong
---
.../java/org/apache/rocketmq/store/CommitLog.java | 34 ++++++++++++----------
1 file changed, 18 insertions(+), 16 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d489e84..cce6481 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -661,14 +661,18 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
- CompletableFuture<PutMessageStatus> flushResultFuture =
submitFlushRequest(result, putMessageResult, msg);
- CompletableFuture<PutMessageStatus> replicaResultFuture =
submitReplicaRequest(result, putMessageResult, msg);
+ CompletableFuture<PutMessageStatus> flushResultFuture =
submitFlushRequest(result, msg);
+ CompletableFuture<PutMessageStatus> replicaResultFuture =
submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture,
(flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
-
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
+ if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
+ log.error("do sync transfer other node, wait return, but
failed, topic: {} tags: {} client address: {}",
+ msg.getTopic(), msg.getTags(),
msg.getBornHostNameString());
+ }
}
return putMessageResult;
});
@@ -762,15 +766,18 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
- CompletableFuture<PutMessageStatus> flushOKFuture =
submitFlushRequest(result, putMessageResult, messageExtBatch);
- CompletableFuture<PutMessageStatus> replicaOKFuture =
submitReplicaRequest(result, putMessageResult, messageExtBatch);
+ CompletableFuture<PutMessageStatus> flushOKFuture =
submitFlushRequest(result, messageExtBatch);
+ CompletableFuture<PutMessageStatus> replicaOKFuture =
submitReplicaRequest(result, messageExtBatch);
return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus,
replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
-
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ putMessageResult.setPutMessageStatus(flushStatus);
}
-
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
+ if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
+ log.error("do sync transfer other node, wait return, but
failed, topic: {} client address: {}",
+ messageExtBatch.getTopic(),
messageExtBatch.getBornHostNameString());
+ }
}
return putMessageResult;
});
@@ -900,8 +907,7 @@ public class CommitLog {
return putMessageResult;
}
- public CompletableFuture<PutMessageStatus>
submitFlushRequest(AppendMessageResult result, PutMessageResult
putMessageResult,
- MessageExt
messageExt) {
+ public CompletableFuture<PutMessageStatus>
submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH ==
this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService)
this.flushCommitLogService;
@@ -926,8 +932,7 @@ public class CommitLog {
}
}
- public CompletableFuture<PutMessageStatus>
submitReplicaRequest(AppendMessageResult result, PutMessageResult
putMessageResult,
- MessageExt messageExt)
{
+ public CompletableFuture<PutMessageStatus>
submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER ==
this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
@@ -1420,13 +1425,10 @@ public class CommitLog {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a
maximum of
// two times the flush
- boolean flushOK = false;
+ boolean flushOK =
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
+ CommitLog.this.mappedFileQueue.flush(0);
flushOK =
CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
-
- if (!flushOK) {
- CommitLog.this.mappedFileQueue.flush(0);
- }
}
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK :
PutMessageStatus.FLUSH_DISK_TIMEOUT);