[ 
https://issues.apache.org/jira/browse/ROCKETMQ-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126853#comment-16126853
 ] 

zhaoziyan commented on ROCKETMQ-267:
------------------------------------

pdflush is disk io. but mmap is memory write . why pdflush may affect commitLog 
write ???
!screenshot-1.png!

> server may reject messages when pdflush write dirty data back info disk
> -----------------------------------------------------------------------
>
>                 Key: ROCKETMQ-267
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-267
>             Project: Apache RocketMQ
>          Issue Type: Bug
>    Affects Versions: 4.1.0-incubating
>         Environment: linux
>            Reporter: wenqi.huang
>            Assignee: vongosling
>            Priority: Critical
>         Attachments: screenshot-1.png
>
>
> I found the following error in the client's log :
> 2017-08-10 09:06:57  ERROR [DubboServerHandler-10.28.109.45:20994-thread-475] 
> c.c.d.a.c.b.r.RocketMQMsgProducer[RocketMQMsgProducer.java:42] -> Send ons 
> msg failed, topic=TopicSubOrderDataSync, tag=TagActivityDataSync, 
> key=activity-order-411320385584760066, msg=411320385584760066
> org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: 
> [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in 
> queue: 208ms, size of queue: 17
> For more information, please visit the url, 
> http://rocketmq.apache.org/docs/faq/
>       at 
> org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:531)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
>       at 
> org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:345)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
>       at 
> org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:327)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
>       at 
> org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:290)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
>       at 
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:688)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
>       at 
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:458)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
>       at 
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1049)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
>       at 
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1008)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
>       at 
> org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:204)
>  ~[rocketmq-client-4.0.0-incubating.jar!/:4.0.0-incubating]
> and I look into store.log in RocketMQ Broker, and found that at the same 
> time, the following line is there:
> 2017-08-10 09:06:57 INFO FlushRealTimeService - Flush data to disk costs 1240 
> ms
> I look into the source code, and found that rocketmq have some index files, 
> which rocketmq will not write immediately(because it is wrote randomlly), but 
> when a index file write finished(about 500MB), rocketmq finally force it into 
> disk, that means dirty pages will be about 500MB maximally, (I have executed 
> the bin/os.sh under RocketMQ,  which change the default behavior of linux 
> pdflush).because the [vm.dirty_ratio] is 50, and the available memory is 
> about 1600MB at my linux machine, 500MB will not exceed 50% of 1600M, so 
> pdflush will not executed in this way. So I guess writeback will impact the 
> RT of  write.
> So I write a testcase and proved this, the code is:
> public class MappedFileTest {
>     public static void main(String[] args) throws IOException, 
> InterruptedException {
>         //mock rocketmq's index file
>         String indexFile = "/home/admin/rocketmq-data/index/index";
>         int indexFileToWriteInMB = 180;
>         FileChannel indexFileChannel = new RandomAccessFile(new 
> File(indexFile), "rw").getChannel();
>         final MappedByteBuffer indexFileBuffer = 
> indexFileChannel.map(MapMode.READ_WRITE, 0, 1024*1024*500);//500M
>         //put some dirty data, attention that the data size will not overflow 
> vm.dirty_background_ratio;
>         // because we set vm.dirty_expire_centisecs to 3000,so after 30 
> seconds,pdflush will writeback the dirty data into disk.
>         byte[] bs = new byte[1024*1024*indexFileToWriteInMB];//180m
>         Arrays.fill(bs,(byte)1);
>         indexFileBuffer.put(bs);
>         //mock rocketmq's commitlog file
>         String commitLogFile = 
> "/home/admin/rocketmq-data/commitlog/commitlog";
>         FileChannel commitLogChannel = new RandomAccessFile(new 
> File(commitLogFile), "rw").getChannel();
>         final MappedByteBuffer commitLogBuffer = 
> commitLogChannel.map(MapMode.READ_WRITE, 0, 1024*1024*1024);//1G
>         final Object lockObj = new Object();
>         //mock FlushCommitLogService to writeback dirty data of commitLog 
> into disk.
>         FlushCommitLogService commitLogService = new 
> FlushCommitLogService(lockObj, commitLogBuffer);
>         commitLogService.start();
>         //mock messageReceived to write data into commitLogFile.
>         mockMessageReceived(lockObj, commitLogBuffer);
>         //wait for about 30 seconds(let linux pdflush to writeback dirty data 
> of indexFile), then you will see some output like(this will block the thread 
> that handle messages, then client will fail to send message,until pdflush is 
> done):
>         //---write cost ms: 213
>         //flushToDisk cost ms:502
>     }
>     private static void mockMessageReceived(Object lockObj, MappedByteBuffer 
> commitLogBuffer){
>         byte[] bs = new byte[1024*10];//10kb
>         Arrays.fill(bs,(byte)1);
>         for(int i=0;i<1000;i++){
>             commitLogBuffer.position(bs.length * i);
>             long start = System.currentTimeMillis();
>             commitLogBuffer.put(bs);
>             start = System.currentTimeMillis() - start;
>             if(start > 1) {
>                 System.out.println("---write cost ms:" + start);
>             }
>             if(i != 0 && i % 2 == 0) {
>                 synchronized (lockObj) {
>                     lockObj.notify();
>                 }
>             }
>             try {
>                 Thread.sleep(50);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }
>     }
>     public static class FlushCommitLogService extends Thread{
>         private Object lockObj;
>         private MappedByteBuffer commitLogBuffer;
>         public FlushCommitLogService(Object lockObj,MappedByteBuffer 
> commitLogBuffer) {
>             this.lockObj = lockObj;
>             this.commitLogBuffer = commitLogBuffer;
>         }
>         public void run(){
>             while(true) {
>                 synchronized (lockObj) {
>                     try {
>                         lockObj.wait();
>                     } catch (InterruptedException e) {
>                         e.printStackTrace();
>                     }
>                 }
>                 System.out.println("flushToDisk started");
>                 long start = System.currentTimeMillis();
>                 commitLogBuffer.force();
>                 start = System.currentTimeMillis() - start;
>                 System.out.println("flushToDisk cost ms:" + start);
>             }
>         }
>     }
> }
> before running this testcase, please make sure you are running this code in 
> linux platform, and set the following linux configs:
> vm.dirty_background_ratio = 50
> vm.dirty_ratio = 50
> vm.dirty_expire_centisecs = 3000
> vm.dirty_writeback_centisecs = 500
> The code will do the following things:
> 1.open a index file, and write some dirty data into it (do not flush 
> manully,let linux pdflush to writeback it into disk)
> 2.open a commitLog file, and write some dirty data into it, every time the 
> dirty data reach 20KB,it will  notify another thread to force data into disk. 
> look into the outputs,it's RT is healthy for now.
> 3.wait about 30 seconds,until the pdflush wakeup and found dirty data in 
> index file exists for 30 seconds,then pdflush will writeback it into disk(you 
> can execute [cat /proc/meminfo|grep Dirty] to show the size of all dirty 
> data).  At the same time,look into the outputs,and you will see some output 
> like this:
> ---write cost ms: 213
> flushToDisk cost ms:502
> the RT of write call is unacceptable,  this will block the thread that handle 
> messages, then client will fail to send message,until pdflush is done. And if 
> you set the dirty data in index file bigger, the write RT will grow bigger 
> too.
> the bin/os.sh set vm.dirty_writeback_centisecs  to 360000(an hour), and I 
> look into store.log, found that the log [Flush data to disk costs * ms] 
> appear hourly, this proved my guess much more realistic.
> I attention that the index file have three sections, with the first is head, 
> the second is index wrote randomly, and the third is another type of index 
> wrote sequential. so maybe the second section can be move into another file 
> and this will only use 20MB of memory,which will make writeback faster,at the 
> same time,the third section can be wrote sequential and writeback into disk 
> as soon as possible ,just like what commitlog does. but when recover from 
> system crash, consistency of this two index files is also a problem.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to