shuangchengsun commented on a change in pull request #3828:
URL: https://github.com/apache/rocketmq/pull/3828#discussion_r802300014
##########
File path:
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
##########
@@ -509,19 +510,27 @@ private boolean isLmqConsumeQueueNumExceeded() {
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
- try {
- return asyncPutMessage(msg).get();
- } catch (InterruptedException | ExecutionException e) {
- return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
- }
+ return waitForPutResult(asyncPutMessage(msg));
}
@Override
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
+ return waitForPutResult(asyncPutMessages(messageExtBatch));
+ }
+
+ private PutMessageResult
waitForPutResult(CompletableFuture<PutMessageResult> putMessageResultFuture) {
try {
- return asyncPutMessages(messageExtBatch).get();
- } catch (InterruptedException | ExecutionException e) {
+ int putMessageTimeout =
+ Math.min(this.messageStoreConfig.getSyncFlushTimeout(),
this.messageStoreConfig.getSlaveTimeout());
+ return putMessageResultFuture.get(putMessageTimeout,
TimeUnit.MILLISECONDS);
Review comment:
emmm. i think it is unnecessary to add new config. it may confused to
add new configuration since here are already enough parameter. The
parameter「SlaveTimeout」is used to distinguish which step is timeout while
"flush disk" and "flush slave" use the same value 「SyncFlushTimeout」 in old
version. However, I just found here still exist some problems in the code
above, which will be fixed right away
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]