wenqi.huang created ROCKETMQ-267:
------------------------------------
Summary: server may reject messages when pdflush write dirty data
back info disk
Key: ROCKETMQ-267
URL: https://issues.apache.org/jira/browse/ROCKETMQ-267
Project: Apache RocketMQ
Issue Type: Bug
Affects Versions: 4.1.0-incubating
Environment: linux
Reporter: wenqi.huang
Assignee: vongosling
Priority: Critical
I found the following error in the client's log :
2017-08-10 09:06:57 ERROR [DubboServerHandler-10.28.109.45:20994-thread-475]
c.c.d.a.c.b.r.RocketMQMsgProducer[RocketMQMsgProducer.java:42] -> Send ons msg
failed, topic=TopicSubOrderDataSync, tag=TagActivityDataSync,
key=activity-order-411320385584760066, msg=411320385584760066
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC:
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in
queue: 208ms, size of queue: 17
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at
org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:531)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:345)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:327)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at
org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:290)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:688)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:458)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1049)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1008)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
at
org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:204)
~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
and I look into store.log in RocketMQ Broker, and found that at the same time,
the following line is there:
2017-08-10 09:06:57 INFO FlushRealTimeService - Flush data to disk costs 1240 ms
I look into the source code, and found that rocketmq have some index files,
which rocketmq will not write immediately(because it is wrote randomlly), but
when a index file write finished(about 500MB), rocketmq finally force it into
disk, that means dirty pages will be about 500MB maximally, (I have executed
the bin/os.sh under RocketMQ, which change the default behavior of linux
pdflush).because the [vm.dirty_ratio] is 50, and the available memory is about
1600MB at my linux machine, 500MB will not exceed 50% of 1600M, so pdflush will
not executed in this way. So I guess writeback will impact the RT of write.
So I write a testcase and proved this, the code is:
public class MappedFileTest {
public static void main(String[] args) throws IOException,
InterruptedException {
//mock rocketmq's index file
String indexFile = "/home/admin/rocketmq-data/index/index";
int indexFileToWriteInMB = 180;
FileChannel indexFileChannel = new RandomAccessFile(new
File(indexFile), "rw").getChannel();
final MappedByteBuffer indexFileBuffer =
indexFileChannel.map(MapMode.READ_WRITE, 0, 1024*1024*500);//500M
//put some dirty data, attention that the data size will not overflow
vm.dirty_background_ratio;
// because we set vm.dirty_expire_centisecs to 3000,so after 30
seconds,pdflush will writeback the dirty data into disk.
byte[] bs = new byte[1024*1024*indexFileToWriteInMB];//180m
Arrays.fill(bs,(byte)1);
indexFileBuffer.put(bs);
//mock rocketmq's commitlog file
String commitLogFile = "/home/admin/rocketmq-data/commitlog/commitlog";
FileChannel commitLogChannel = new RandomAccessFile(new
File(commitLogFile), "rw").getChannel();
final MappedByteBuffer commitLogBuffer =
commitLogChannel.map(MapMode.READ_WRITE, 0, 1024*1024*1024);//1G
final Object lockObj = new Object();
//mock FlushCommitLogService to writeback dirty data of commitLog into
disk.
FlushCommitLogService commitLogService = new
FlushCommitLogService(lockObj, commitLogBuffer);
commitLogService.start();
//mock messageReceived to write data into commitLogFile.
mockMessageReceived(lockObj, commitLogBuffer);
//wait for about 30 seconds(let linux pdflush to writeback dirty data
of indexFile), then you will see some output like(this will block the thread
that handle messages, then client will fail to send message,until pdflush is
done):
//---write cost ms: 213
//flushToDisk cost ms:502
}
private static void mockMessageReceived(Object lockObj, MappedByteBuffer
commitLogBuffer){
byte[] bs = new byte[1024*10];//10kb
Arrays.fill(bs,(byte)1);
for(int i=0;i<1000;i++){
commitLogBuffer.position(bs.length * i);
long start = System.currentTimeMillis();
commitLogBuffer.put(bs);
start = System.currentTimeMillis() - start;
if(start > 1) {
System.out.println("---write cost ms:" + start);
}
if(i != 0 && i % 2 == 0) {
synchronized (lockObj) {
lockObj.notify();
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static class FlushCommitLogService extends Thread{
private Object lockObj;
private MappedByteBuffer commitLogBuffer;
public FlushCommitLogService(Object lockObj,MappedByteBuffer
commitLogBuffer) {
this.lockObj = lockObj;
this.commitLogBuffer = commitLogBuffer;
}
public void run(){
while(true) {
synchronized (lockObj) {
try {
lockObj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("flushToDisk started");
long start = System.currentTimeMillis();
commitLogBuffer.force();
start = System.currentTimeMillis() - start;
System.out.println("flushToDisk cost ms:" + start);
}
}
}
}
before running this testcase, please make sure you are running this code in
linux platform, and set the following linux configs:
vm.dirty_background_ratio = 50
vm.dirty_ratio = 50
vm.dirty_expire_centisecs = 3000
vm.dirty_writeback_centisecs = 500
The code will do the following things:
1.open a index file, and write some dirty data into it (do not flush
manully,let linux pdflush to writeback it into disk)
2.open a commitLog file, and write some dirty data into it, every time the
dirty data reach 20KB,it will notify another thread to force data into disk.
look into the outputs,it's RT is healthy for now.
3.wait about 30 seconds,until the pdflush wakeup and found dirty data in index
file exists for 30 seconds,then pdflush will writeback it into disk(you can
execute [cat /proc/meminfo|grep Dirty] to show the size of all dirty data). At
the same time,look into the outputs,and you will see some output like this:
---write cost ms: 213
flushToDisk cost ms:502
the RT of write call is unacceptable, this will block the thread that handle
messages, then client will fail to send message,until pdflush is done. And if
you set the dirty data in index file bigger, the write RT will grow bigger too.
the bin/os.sh set vm.dirty_writeback_centisecs to 360000(an hour), and I look
into store.log, found that the log [Flush data to disk costs * ms] appear
hourly, this proved my guess much more realistic.
I attention that the index file have three sections, with the first is head,
the second is index wrote randomly, and the third is another type of index
wrote sequential. so maybe the second section can be move into another file and
this will only use 20MB of memory,which will make writeback faster,at the same
time,the third section can be wrote sequential and writeback into disk as soon
as possible ,just like what commitlog does. but when recover from system crash,
consistency of this two index files is also a problem.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)