[
https://issues.apache.org/jira/browse/ROCKETMQ-272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136453#comment-16136453
]
Yu Kaiyuan commented on ROCKETMQ-272:
-------------------------------------
Relevant log:
*Proudcer*
{code:java}
2017-08-22 14:26:25,420 INFO main com.rmq.example.TestProducer 28: Message
content size = 512028
2017-08-22 14:26:25,887 INFO main com.rmq.example.TestProducer 35: sendResult =
SendResult [sendStatus=FLUSH_SLAVE_TIMEOUT,
msgId=0A05121B278412A3A3806CAB9B7D0000,
offsetMsgId=0ADD492200002A9F0000000011B26A50, messageQueue=MessageQueue
[topic=TopicTestBigMessage, brokerName=broker-a, queueId=0], queueOffset=1]
{code}
*Broker*
{code:java}
2017-08-22 14:26:25 WARN GroupTransferService - transfer messsage to slave
timeout, 297196971
2017-08-22 14:26:25 ERROR SendMessageThread_1 - do sync transfer other node,
wait return, but failed, topic: TopicTestBigMessage tags: null client address:
10.5.18.27
{code}
> The config `syncFlushTimeout` doesn't work for SYNC_MASTER
> ----------------------------------------------------------
>
> Key: ROCKETMQ-272
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-272
> Project: Apache RocketMQ
> Issue Type: Bug
> Components: rocketmq-broker
> Affects Versions: 4.1.0-incubating
> Reporter: Yu Kaiyuan
> Assignee: yukon
>
> It's quite frequent to get result as `sendStatus=FLUSH_SLAVE_TIMEOUT` when
> sending big messages(>500k) in SYNC_MASTER/SLAVE scenario.
> The timeout value used by the sync process currently as I found, is the
> config `syncFlushTimeout`. And its default value is 5000 milliseconds.
> But it shows that producer get the result as `FLUSH_SLAVE_TIMEOUT` less than
> 1 second.
> So why does the config not work as expected?
> Relevant code:
> {code:java}
> // CommitLog.java
> public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
> {
> // Synchronous write double
> if (BrokerRole.SYNC_MASTER ==
> this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
> HAService service = this.defaultMessageStore.getHaService();
> if (msg.isWaitStoreMsgOK()) {
> // Determine whether to wait
> if (service.isSlaveOK(result.getWroteOffset() +
> result.getWroteBytes())) {
> if (null == request) {
> request = new GroupCommitRequest(result.getWroteOffset()
> + result.getWroteBytes());
> }
> service.putRequest(request);
> service.getWaitNotifyObject().wakeupAll();
> boolean flushOK =
> // TODO
>
> request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
> if (!flushOK) {
> log.error("do sync transfer other node, wait return, but
> failed, topic: " + msg.getTopic() + " tags: "
> + msg.getTags() + " client address: " +
> msg.getBornHostString());
>
> putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
> }
> }
> // Slave problem
> else {
> // Tell the producer, slave not available
>
> putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
> }
> }
> }
> return putMessageResult;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)