[RocketMQ-58] Add integration test for RocketMQ, also thanks @fenglianghfl for 
this commit, closes apache/incubator-rocketmq#46


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/788771a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/788771a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/788771a8

Branch: refs/heads/master
Commit: 788771a851512f1e588ebeb6952e61fe8d892804
Parents: 581039b
Author: dongeforever <[email protected]>
Authored: Fri Jan 20 11:36:31 2017 +0800
Committer: yukon <[email protected]>
Committed: Fri Jan 20 11:36:31 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   1 +
 BUILDING                                        |   2 +-
 pom.xml                                         |  73 +++--
 test/pom.xml                                    |  52 ++++
 .../test/client/mq/MQAsyncProducer.java         |  85 ++++++
 .../test/client/rmq/RMQAsyncSendProducer.java   | 226 ++++++++++++++
 .../test/client/rmq/RMQBroadCastConsumer.java   |  37 +++
 .../test/client/rmq/RMQNormalConsumer.java      |  90 ++++++
 .../test/client/rmq/RMQNormalProducer.java      | 167 ++++++++++
 .../clientinterface/AbstractMQConsumer.java     | 112 +++++++
 .../clientinterface/AbstractMQProducer.java     | 151 +++++++++
 .../test/clientinterface/MQCollector.java       | 108 +++++++
 .../test/clientinterface/MQConsumer.java        |  26 ++
 .../test/clientinterface/MQProducer.java        |  30 ++
 .../rocketmq/test/factory/ConsumerFactory.java  |  45 +++
 .../rocketmq/test/factory/MQMessageFactory.java | 128 ++++++++
 .../rocketmq/test/factory/MessageFactory.java   |  62 ++++
 .../rocketmq/test/factory/ProducerFactory.java  |  37 +++
 .../test/factory/SendCallBackFactory.java       |  35 +++
 .../rocketmq/test/factory/TagMessage.java       | 108 +++++++
 .../test/listener/AbstractListener.java         | 104 +++++++
 .../rmq/concurrent/RMQDelayListner.java         |  61 ++++
 .../rmq/concurrent/RMQNormalListner.java        |  70 +++++
 .../listener/rmq/order/RMQOrderListener.java    |  85 ++++++
 .../rocketmq/test/message/MessageQueueMsg.java  |  62 ++++
 .../rocketmq/test/sendresult/SendResult.java    |  62 ++++
 .../apache/rocketmq/test/util/Condition.java    |  23 ++
 .../test/util/DuplicateMessageInfo.java         | 137 +++++++++
 .../org/apache/rocketmq/test/util/FileUtil.java | 108 +++++++
 .../org/apache/rocketmq/test/util/MQAdmin.java  | 164 ++++++++++
 .../rocketmq/test/util/MQRandomUtils.java       |  28 ++
 .../org/apache/rocketmq/test/util/MQWait.java   |  93 ++++++
 .../apache/rocketmq/test/util/RandomUtil.java   | 306 +++++++++++++++++++
 .../apache/rocketmq/test/util/RandomUtils.java  |  91 ++++++
 .../org/apache/rocketmq/test/util/TestUtil.java | 122 ++++++++
 .../apache/rocketmq/test/util/TestUtils.java    |  50 +++
 .../apache/rocketmq/test/util/VerifyUtils.java  | 146 +++++++++
 .../test/util/data/collect/DataCollector.java   |  45 +++
 .../util/data/collect/DataCollectorManager.java | 112 +++++++
 .../test/util/data/collect/DataFilter.java      |  22 ++
 .../collect/impl/ListDataCollectorImpl.java     |  95 ++++++
 .../data/collect/impl/MapDataCollectorImpl.java | 111 +++++++
 .../test/util/parallel/ParallelTask.java        |  43 +++
 .../util/parallel/ParallelTaskExecutor.java     |  67 ++++
 .../rocketmq/test/util/parallel/Task4Test.java  |  30 ++
 .../org/apache/rocketmq/test/base/BaseConf.java | 162 ++++++++++
 .../rocketmq/test/base/IntegrationTestBase.java | 143 +++++++++
 .../balance/NormalMsgDynamicBalanceIT.java      | 111 +++++++
 .../balance/NormalMsgStaticBalanceIT.java       | 109 +++++++
 .../consumer/broadcast/BaseBroadCastIT.java     |  56 ++++
 .../normal/BroadCastNormalMsgNotRecvIT.java     |  73 +++++
 .../normal/BroadCastNormalMsgRecvCrashIT.java   |  90 ++++++
 .../normal/BroadCastNormalMsgRecvFailIT.java    |  72 +++++
 .../BroadCastNormalMsgRecvStartLaterIT.java     |  88 ++++++
 .../BroadCastNormalMsgTwoDiffGroupRecvIT.java   |  78 +++++
 .../normal/NormalMsgTwoSameGroupConsumerIT.java |  78 +++++
 .../broadcast/order/OrderMsgBroadCastIT.java    |  75 +++++
 .../tag/BroadCastTwoConsumerFilterIT.java       |  78 +++++
 .../tag/BroadCastTwoConsumerSubDiffTagIT.java   |  75 +++++
 .../tag/BroadCastTwoConsumerSubTagIT.java       |  75 +++++
 .../consumer/cluster/DynamicAddAndCrashIT.java  | 103 +++++++
 .../consumer/cluster/DynamicAddConsumerIT.java  |  97 ++++++
 .../cluster/DynamicCrashConsumerIT.java         | 100 ++++++
 .../test/client/consumer/tag/MulTagSubIT.java   | 156 ++++++++++
 .../consumer/tag/TagMessageWith1ConsumerIT.java | 197 ++++++++++++
 .../tag/TagMessageWithMulConsumerIT.java        | 196 ++++++++++++
 .../tag/TagMessageWithSameGroupConsumerIT.java  | 115 +++++++
 .../consumer/topic/MulConsumerMulTopicIT.java   | 108 +++++++
 .../consumer/topic/OneConsumerMulTopicIT.java   | 104 +++++++
 .../producer/async/AsyncSendExceptionIT.java    | 150 +++++++++
 .../async/AsyncSendWithMessageQueueIT.java      |  85 ++++++
 .../AsyncSendWithMessageQueueSelectorIT.java    | 106 +++++++
 .../async/AsyncSendWithOnlySendCallBackIT.java  |  64 ++++
 .../producer/exception/msg/ChinaPropIT.java     |  74 +++++
 .../exception/msg/MessageExceptionIT.java       | 129 ++++++++
 .../exception/msg/MessageUserPropIT.java        |  94 ++++++
 .../ProducerGroupAndInstanceNameValidityIT.java |  69 +++++
 .../producer/oneway/OneWaySendExceptionIT.java  |  78 +++++
 .../client/producer/oneway/OneWaySendIT.java    |  64 ++++
 .../producer/oneway/OneWaySendWithMQIT.java     |  79 +++++
 .../oneway/OneWaySendWithSelectorIT.java        | 104 +++++++
 .../order/OrderMsgDynamicRebalanceIT.java       | 115 +++++++
 .../test/client/producer/order/OrderMsgIT.java  | 108 +++++++
 .../producer/order/OrderMsgRebalanceIT.java     | 133 ++++++++
 .../producer/order/OrderMsgWithTagIT.java       | 169 ++++++++++
 .../querymsg/QueryMsgByIdExceptionIT.java       |  81 +++++
 .../producer/querymsg/QueryMsgByIdIT.java       |  75 +++++
 .../producer/querymsg/QueryMsgByKeyIT.java      | 104 +++++++
 .../apache/rocketmq/test/delay/DelayConf.java   |  27 ++
 .../rocketmq/test/delay/NormalMsgDelayIT.java   | 116 +++++++
 .../test/smoke/NormalMessageSendAndRecvIT.java  |  62 ++++
 test/src/test/resources/log4j.xml               |  46 +++
 test/src/test/resources/logback-test.xml        |  33 ++
 93 files changed, 8492 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index dd57ba3..916cac5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -39,4 +39,5 @@ script:
   - travis_retry mvn -B package jacoco:report coveralls:report
 
 after_success:
+  - mvn clean install -Pit-test
   - mvn sonar:sonar

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/BUILDING
----------------------------------------------------------------------
diff --git a/BUILDING b/BUILDING
index a92cbd5..1498b3e 100644
--- a/BUILDING
+++ b/BUILDING
@@ -34,4 +34,4 @@ Then, import to eclipse by specifying the root directory of 
the project via:
 
 Execute the following command in order to build the tar.gz packages and 
install JAR to the local repository:
 
-$ mvn clean package install -Prelease-all assembly:assembly -U
\ No newline at end of file
+$ mvn clean install -Prelease-all assembly:assembly -U
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 8ec8170..8b8c468 100644
--- a/pom.xml
+++ b/pom.xml
@@ -160,9 +160,6 @@
         <!-- Compiler settings properties -->
         <maven.compiler.source>1.7</maven.compiler.source>
         <maven.compiler.target>1.7</maven.compiler.target>
-
-        <!-- Overwritten by the test configuration,otherwise the JaCoCo agent 
cannot be attached.Details see 
http://www.eclemma.org/jacoco/trunk/doc/prepare-agent-mojo.html -->
-        <argLine>-Xms512m -Xmx1024m</argLine>
         <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
         <!-- URL of the ASF SonarQube server -->
         <sonar.host.url>https://builds.apache.org/analysis</sonar.host.url>
@@ -181,6 +178,7 @@
         <module>example</module>
         <module>filtersrv</module>
         <module>srvutil</module>
+        <module>test</module>
     </modules>
 
     <build>
@@ -237,25 +235,6 @@
                 </configuration>
             </plugin>
             <plugin>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.19.1</version>
-                <configuration>
-                    <forkCount>1</forkCount>
-                    <reuseForks>true</reuseForks>
-                    <includes>
-                        <include>**/*Test.java</include>
-                    </includes>
-                </configuration>
-            </plugin>
-            <plugin>
-                <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.19.1</version>
-                <configuration>
-                    <forkCount>1</forkCount>
-                    <reuseForks>true</reuseForks>
-                </configuration>
-            </plugin>
-            <plugin>
                 <artifactId>maven-javadoc-plugin</artifactId>
                 <version>2.10.4</version>
                 <configuration>
@@ -310,6 +289,7 @@
                             <encoding>UTF-8</encoding>
                             <consoleOutput>true</consoleOutput>
                             <failsOnError>true</failsOnError>
+                            
<includeTestSourceDirectory>false</includeTestSourceDirectory>
                         </configuration>
                         <goals>
                             <goal>check</goal>
@@ -352,12 +332,20 @@
                         <goals>
                             <goal>prepare-agent</goal>
                         </goals>
+                        <configuration>
+                            
<destFile>${project.build.directory}/jacoco.exec</destFile>
+                        </configuration>
                     </execution>
                     <execution>
                         <id>default-prepare-agent-integration</id>
+                        <phase>pre-integration-test</phase>
                         <goals>
                             <goal>prepare-agent-integration</goal>
                         </goals>
+                        <configuration>
+                            
<destFile>${project.build.directory}/jacoco-it.exec</destFile>
+                            <propertyName>failsafeArgLine</propertyName>
+                        </configuration>
                     </execution>
                     <execution>
                         <id>default-report</id>
@@ -374,6 +362,14 @@
                 </executions>
             </plugin>
             <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <forkCount>1</forkCount>
+                    <reuseForks>true</reuseForks>
+                </configuration>
+            </plugin>
+            <plugin>
                 <groupId>org.codehaus.mojo</groupId>
                 <artifactId>findbugs-maven-plugin</artifactId>
                 <version>3.0.4</version>
@@ -475,6 +471,37 @@
                 </plugins>
             </build>
         </profile>
+        <profile>
+            <id>it-test</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-failsafe-plugin</artifactId>
+                        <version>2.19.1</version>
+                        <configuration>
+                            <forkCount>1</forkCount>
+                            <reuseForks>true</reuseForks>
+                            <argLine>@{failsafeArgLine}</argLine>
+                            <excludes>
+                                <exclude>**/NormalMsgDelayIT.java</exclude>
+                                
<exclude>**/BroadCastNormalMsgNotRecvIT.java</exclude>
+                                
<exclude>**/TagMessageWithSameGroupConsumerIT.java</exclude>
+                                
<exclude>**/AsyncSendWithMessageQueueSelectorIT.java</exclude>
+                                
<exclude>**/AsyncSendWithMessageQueueIT.java</exclude>
+                            </excludes>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>integration-test</goal>
+                                    <goal>verify</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
     </profiles>
 
     <dependencies>
@@ -537,7 +564,7 @@
             </dependency>
             <dependency>
                 <groupId>${project.groupId}</groupId>
-                <artifactId>rocketmq-qatest</artifactId>
+                <artifactId>rocketmq-test</artifactId>
                 <version>${project.version}</version>
             </dependency>
             <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
new file mode 100644
index 0000000..09ec9d3
--- /dev/null
+++ b/test/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>rocketmq-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-test</artifactId>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-namesrv</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.truth</groupId>
+            <artifactId>truth</artifactId>
+            <version>0.30</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java 
b/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java
new file mode 100644
index 0000000..6b2357b
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/mq/MQAsyncProducer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.mq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.util.TestUtil;
+
+public class MQAsyncProducer {
+    private static Logger logger = Logger.getLogger(MQAsyncProducer.class);
+    private AbstractMQProducer producer = null;
+    private long msgNum;
+    private int intervalMills;
+    private Thread sendT;
+    private AtomicBoolean bPause = new AtomicBoolean(false);
+
+    public MQAsyncProducer(final AbstractMQProducer producer, final long 
msgNum,
+        final int intervalMills) {
+        this.producer = producer;
+        this.msgNum = msgNum;
+        this.intervalMills = intervalMills;
+
+        sendT = new Thread(new Runnable() {
+            public void run() {
+                for (int i = 0; i < msgNum; i++) {
+                    if (!bPause.get()) {
+                        producer.send();
+                        TestUtil.waitForMonment(intervalMills);
+                    } else {
+                        while (true) {
+                            if (bPause.get()) {
+                                TestUtil.waitForMonment(10);
+                            } else
+                                break;
+                        }
+                    }
+
+                }
+            }
+        });
+
+    }
+
+    public void start() {
+        sendT.start();
+    }
+
+    public void waitSendAll(int waitMills) {
+        long startTime = System.currentTimeMillis();
+        while ((producer.getAllMsgBody().size() + 
producer.getSendErrorMsg().size()) < msgNum) {
+            if (System.currentTimeMillis() - startTime < waitMills) {
+                TestUtil.waitForMonment(200);
+            } else {
+                logger.error(String.format("time elapse:%s, but the message 
sending has not finished",
+                    System.currentTimeMillis() - startTime));
+                break;
+            }
+        }
+    }
+
+    public void pauseProducer() {
+        bPause.set(true);
+    }
+
+    public void notifyProducer() {
+        bPause.set(false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java
 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java
new file mode 100644
index 0000000..4a2ce2b
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQAsyncSendProducer.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.sendresult.SendResult;
+import org.apache.rocketmq.test.util.RandomUtil;
+import org.apache.rocketmq.test.util.TestUtil;
+
+public class RMQAsyncSendProducer extends AbstractMQProducer {
+    private static Logger logger = Logger
+        .getLogger(RMQAsyncSendProducer.class);
+    private String nsAddr = null;
+    private DefaultMQProducer producer = null;
+    private SendCallback sendCallback = null;
+    private List<org.apache.rocketmq.client.producer.SendResult> 
successSendResult = new 
ArrayList<org.apache.rocketmq.client.producer.SendResult>();
+    private AtomicInteger exceptionMsgCount = new AtomicInteger(
+        0);
+    private int msgSize = 0;
+
+    public RMQAsyncSendProducer(String nsAddr, String topic) {
+        super(topic);
+        this.nsAddr = nsAddr;
+        sendCallback = new SendCallback() {
+            public void 
onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
+                successSendResult.add(sendResult);
+            }
+
+            public void onException(Throwable throwable) {
+                exceptionMsgCount.getAndIncrement();
+            }
+        };
+
+        create();
+        start();
+    }
+
+    public int getSuccessMsgCount() {
+        return successSendResult.size();
+    }
+
+    public List<org.apache.rocketmq.client.producer.SendResult> 
getSuccessSendResult() {
+        return successSendResult;
+    }
+
+    public int getExceptionMsgCount() {
+        return exceptionMsgCount.get();
+    }
+
+    private void create() {
+        producer = new DefaultMQProducer();
+        producer.setProducerGroup(RandomUtil.getStringByUUID());
+        producer.setInstanceName(RandomUtil.getStringByUUID());
+
+        if (nsAddr != null) {
+            producer.setNamesrvAddr(nsAddr);
+        }
+
+    }
+
+    private void start() {
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            logger.error("producer start failed!");
+            e.printStackTrace();
+        }
+    }
+
+    public SendResult send(Object msg, Object arg) {
+        return null;
+    }
+
+    public void shutdown() {
+        producer.shutdown();
+    }
+
+    public void asyncSend(Object msg) {
+        Message metaqMsg = (Message) msg;
+        try {
+            producer.send(metaqMsg, sendCallback);
+            msgBodys.addData(new String(metaqMsg.getBody()));
+            originMsgs.addData(msg);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void asyncSend(int msgSize) {
+        this.msgSize = msgSize;
+
+        for (int i = 0; i < msgSize; i++) {
+            Message msg = new Message(topic, 
RandomUtil.getStringByUUID().getBytes());
+            this.asyncSend(msg);
+        }
+    }
+
+    public void asyncSend(Object msg, MessageQueueSelector selector, Object 
arg) {
+        Message metaqMsg = (Message) msg;
+        try {
+            producer.send(metaqMsg, selector, arg, sendCallback);
+            msgBodys.addData(new String(metaqMsg.getBody()));
+            originMsgs.addData(msg);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void asyncSend(int msgSize, MessageQueueSelector selector) {
+        this.msgSize = msgSize;
+        for (int i = 0; i < msgSize; i++) {
+            Message msg = new Message(topic, 
RandomUtil.getStringByUUID().getBytes());
+            this.asyncSend(msg, selector, i);
+        }
+    }
+
+    public void asyncSend(Object msg, MessageQueue mq) {
+        Message metaqMsg = (Message) msg;
+        try {
+            producer.send(metaqMsg, mq, sendCallback);
+            msgBodys.addData(new String(metaqMsg.getBody()));
+            originMsgs.addData(msg);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void asyncSend(int msgSize, MessageQueue mq) {
+        this.msgSize = msgSize;
+        for (int i = 0; i < msgSize; i++) {
+            Message msg = new Message(topic, 
RandomUtil.getStringByUUID().getBytes());
+            this.asyncSend(msg, mq);
+        }
+    }
+
+    public void waitForResponse(int timeoutMills) {
+        long startTime = System.currentTimeMillis();
+        while (this.successSendResult.size() != this.msgSize) {
+            if (System.currentTimeMillis() - startTime < timeoutMills) {
+                TestUtil.waitForMonment(100);
+            } else {
+                logger.info("timeout but still not recv all response!");
+                break;
+            }
+        }
+    }
+
+    public void sendOneWay(Object msg) {
+        Message metaqMsg = (Message) msg;
+        try {
+            producer.sendOneway(metaqMsg);
+            msgBodys.addData(new String(metaqMsg.getBody()));
+            originMsgs.addData(msg);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void sendOneWay(int msgSize) {
+        for (int i = 0; i < msgSize; i++) {
+            Message msg = new Message(topic, 
RandomUtil.getStringByUUID().getBytes());
+            this.sendOneWay(msg);
+        }
+    }
+
+    public void sendOneWay(Object msg, MessageQueue mq) {
+        Message metaqMsg = (Message) msg;
+        try {
+            producer.sendOneway(metaqMsg, mq);
+            msgBodys.addData(new String(metaqMsg.getBody()));
+            originMsgs.addData(msg);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void sendOneWay(int msgSize, MessageQueue mq) {
+        for (int i = 0; i < msgSize; i++) {
+            Message msg = new Message(topic, 
RandomUtil.getStringByUUID().getBytes());
+            this.sendOneWay(msg, mq);
+        }
+    }
+
+    public void sendOneWay(Object msg, MessageQueueSelector selector, Object 
arg) {
+        Message metaqMsg = (Message) msg;
+        try {
+            producer.sendOneway(metaqMsg, selector, arg);
+            msgBodys.addData(new String(metaqMsg.getBody()));
+            originMsgs.addData(msg);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void sendOneWay(int msgSize, MessageQueueSelector selector) {
+        for (int i = 0; i < msgSize; i++) {
+            Message msg = new Message(topic, 
RandomUtil.getStringByUUID().getBytes());
+            this.sendOneWay(msg, selector, i);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
new file mode 100644
index 0000000..8af49ea
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQBroadCastConsumer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class RMQBroadCastConsumer extends RMQNormalConsumer {
+    private static Logger logger = 
Logger.getLogger(RMQBroadCastConsumer.class);
+
+    public RMQBroadCastConsumer(String nsAddr, String topic, String 
subExpression,
+        String consumerGroup, AbstractListener listner) {
+        super(nsAddr, topic, subExpression, consumerGroup, listner);
+    }
+
+    @Override
+    public void create() {
+        super.create();
+        consumer.setMessageModel(MessageModel.BROADCASTING);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
new file mode 100644
index 0000000..3f185d3
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
+import org.apache.rocketmq.test.listener.AbstractListener;
+import org.apache.rocketmq.test.util.RandomUtil;
+
+public class RMQNormalConsumer extends AbstractMQConsumer {
+    private static Logger logger = Logger.getLogger(RMQNormalConsumer.class);
+    protected DefaultMQPushConsumer consumer = null;
+
+    public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
+        String consumerGroup, AbstractListener listner) {
+        super(nsAddr, topic, subExpression, consumerGroup, listner);
+    }
+
+    public AbstractListener getListner() {
+        return listner;
+    }
+
+    public void setListner(AbstractListener listner) {
+        this.listner = listner;
+    }
+
+    public void create() {
+        consumer = new DefaultMQPushConsumer(consumerGroup);
+        consumer.setInstanceName(RandomUtil.getStringByUUID());
+        consumer.setNamesrvAddr(nsAddr);
+        try {
+            consumer.subscribe(topic, subExpression);
+        } catch (MQClientException e) {
+            logger.error("consumer subscribe failed!");
+            e.printStackTrace();
+        }
+        consumer.setMessageListener(listner);
+    }
+
+    public void start() {
+        try {
+            consumer.start();
+            logger.info(String.format("consumer[%s] started!", 
consumer.getConsumerGroup()));
+        } catch (MQClientException e) {
+            logger.error("consumer start failed!");
+            e.printStackTrace();
+        }
+    }
+
+    public void subscribe(String topic, String subExpression) {
+        try {
+            consumer.subscribe(topic, subExpression);
+        } catch (MQClientException e) {
+            logger.error("consumer subscribe failed!");
+            e.printStackTrace();
+        }
+    }
+
+    public void shutdown() {
+        consumer.shutdown();
+    }
+
+    @Override
+    public void clearMsg() {
+        this.listner.clearMsg();
+    }
+
+    public void restart() {
+        consumer.shutdown();
+        create();
+        start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
new file mode 100644
index 0000000..26b77fe
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalProducer.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.sendresult.SendResult;
+
+public class RMQNormalProducer extends AbstractMQProducer {
+    private static Logger logger = Logger.getLogger(RMQNormalProducer.class);
+    private DefaultMQProducer producer = null;
+    private String nsAddr = null;
+
+    public RMQNormalProducer(String nsAddr, String topic) {
+        super(topic);
+        this.nsAddr = nsAddr;
+        create();
+        start();
+    }
+
+    public RMQNormalProducer(String nsAddr, String topic, String 
producerGroupName,
+        String producerInstanceName) {
+        super(topic);
+        this.producerGroupName = producerGroupName;
+        this.producerInstanceName = producerInstanceName;
+        this.nsAddr = nsAddr;
+
+        create();
+        start();
+    }
+
+    public DefaultMQProducer getProducer() {
+        return producer;
+    }
+
+    public void setProducer(DefaultMQProducer producer) {
+        this.producer = producer;
+    }
+
+    protected void create() {
+        producer = new DefaultMQProducer();
+        producer.setProducerGroup(getProducerGroupName());
+        producer.setInstanceName(getProducerInstanceName());
+
+        if (nsAddr != null) {
+            producer.setNamesrvAddr(nsAddr);
+        }
+
+    }
+
+    public void start() {
+        try {
+            producer.start();
+            super.setStartSuccess(true);
+        } catch (MQClientException e) {
+            super.setStartSuccess(false);
+            logger.error("producer start failed!");
+            e.printStackTrace();
+        }
+    }
+
+    public SendResult send(Object msg, Object orderKey) {
+        org.apache.rocketmq.client.producer.SendResult metaqResult = null;
+        Message metaqMsg = (Message) msg;
+        try {
+            long start = System.currentTimeMillis();
+            metaqResult = producer.send(metaqMsg);
+            this.msgRTs.addData(System.currentTimeMillis() - start);
+            if (isDebug) {
+                logger.info(metaqResult);
+            }
+            sendResult.setMsgId(metaqResult.getMsgId());
+            
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
+            
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
+            msgBodys.addData(new String(metaqMsg.getBody()));
+            originMsgs.addData(msg);
+            originMsgIndex.put(new String(metaqMsg.getBody()), metaqResult);
+        } catch (Exception e) {
+            if (isDebug) {
+                e.printStackTrace();
+            }
+
+            sendResult.setSendResult(false);
+            sendResult.setSendException(e);
+            errorMsgs.addData(msg);
+        }
+
+        return sendResult;
+    }
+
+    public void send(Map<MessageQueue, List<Object>> msgs) {
+        for (MessageQueue mq : msgs.keySet()) {
+            send(msgs.get(mq), mq);
+        }
+    }
+
+    public void send(List<Object> msgs, MessageQueue mq) {
+        for (Object msg : msgs) {
+            sendMQ((Message) msg, mq);
+        }
+    }
+
+    public SendResult sendMQ(Message msg, MessageQueue mq) {
+        org.apache.rocketmq.client.producer.SendResult metaqResult = null;
+        try {
+            long start = System.currentTimeMillis();
+            metaqResult = producer.send(msg, mq);
+            this.msgRTs.addData(System.currentTimeMillis() - start);
+            if (isDebug) {
+                logger.info(metaqResult);
+            }
+            sendResult.setMsgId(metaqResult.getMsgId());
+            
sendResult.setSendResult(metaqResult.getSendStatus().equals(SendStatus.SEND_OK));
+            
sendResult.setBrokerIp(metaqResult.getMessageQueue().getBrokerName());
+            msgBodys.addData(new String(msg.getBody()));
+            originMsgs.addData(msg);
+            originMsgIndex.put(new String(msg.getBody()), metaqResult);
+        } catch (Exception e) {
+            if (isDebug) {
+                e.printStackTrace();
+            }
+
+            sendResult.setSendResult(false);
+            sendResult.setSendException(e);
+            errorMsgs.addData(msg);
+        }
+
+        return sendResult;
+    }
+
+    public void shutdown() {
+        producer.shutdown();
+    }
+
+    @Override
+    public List<MessageQueue> getMessageQueue() {
+        List<MessageQueue> mqs = null;
+        try {
+            mqs = producer.fetchPublishMessageQueues(topic);
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+        return mqs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
new file mode 100644
index 0000000..a077129
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQConsumer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.clientinterface;
+
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public abstract class AbstractMQConsumer implements MQConsumer {
+    protected AbstractListener listner = null;
+    protected String nsAddr = null;
+    protected String topic = null;
+    protected String subExpression = null;
+    protected String consumerGroup = null;
+    protected boolean isDebug = false;
+
+    public AbstractMQConsumer() {
+    }
+
+    public AbstractMQConsumer(String nsAddr, String topic, String 
subExpression,
+        String consumerGroup, AbstractListener listner) {
+        this.topic = topic;
+        this.subExpression = subExpression;
+        this.consumerGroup = consumerGroup;
+        this.listner = listner;
+        this.nsAddr = nsAddr;
+    }
+
+    public AbstractMQConsumer(String topic, String subExpression) {
+        this.topic = topic;
+        this.subExpression = subExpression;
+    }
+
+    public void setDebug() {
+        if (listner != null) {
+            listner.setDebug(true);
+        }
+
+        isDebug = true;
+    }
+
+    public void setDebug(boolean isDebug) {
+        if (listner != null) {
+            listner.setDebug(isDebug);
+        }
+
+        this.isDebug = isDebug;
+    }
+
+    public void setSubscription(String topic, String subExpression) {
+        this.topic = topic;
+        this.subExpression = subExpression;
+    }
+
+    public AbstractListener getListner() {
+        return listner;
+    }
+
+    public void setListner(AbstractListener listner) {
+        this.listner = listner;
+    }
+
+    public String getNsAddr() {
+        return nsAddr;
+    }
+
+    public void setNsAddr(String nsAddr) {
+        this.nsAddr = nsAddr;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getSubExpression() {
+        return subExpression;
+    }
+
+    public void setSubExpression(String subExpression) {
+        this.subExpression = subExpression;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public void clearMsg() {
+        listner.clearMsg();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java
 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java
new file mode 100644
index 0000000..8201e63
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/AbstractMQProducer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.clientinterface;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.sendresult.SendResult;
+import org.apache.rocketmq.test.util.RandomUtil;
+import org.apache.rocketmq.test.util.TestUtil;
+
+public abstract class AbstractMQProducer extends MQCollector implements 
MQProducer {
+    protected String topic = null;
+
+    protected SendResult sendResult = new SendResult();
+    protected boolean startSuccess = false;
+    protected String producerGroupName = null;
+    protected String producerInstanceName = null;
+    protected boolean isDebug = false;
+
+    public AbstractMQProducer(String topic) {
+        super();
+        producerGroupName = RandomUtil.getStringByUUID();
+        producerInstanceName = RandomUtil.getStringByUUID();
+        this.topic = topic;
+
+    }
+
+    public AbstractMQProducer(String topic, String originMsgCollector, String 
msgBodyCollector) {
+        super(originMsgCollector, msgBodyCollector);
+        producerGroupName = RandomUtil.getStringByUUID();
+        producerInstanceName = RandomUtil.getStringByUUID();
+        this.topic = topic;
+    }
+
+    public boolean isStartSuccess() {
+        return startSuccess;
+    }
+
+    public void setStartSuccess(boolean startSuccess) {
+        this.startSuccess = startSuccess;
+    }
+
+    public String getProducerInstanceName() {
+        return producerInstanceName;
+    }
+
+    public void setProducerInstanceName(String producerInstanceName) {
+        this.producerInstanceName = producerInstanceName;
+    }
+
+    public String getProducerGroupName() {
+        return producerGroupName;
+    }
+
+    public void setProducerGroupName(String producerGroupName) {
+        this.producerGroupName = producerGroupName;
+    }
+
+    public void setDebug() {
+        isDebug = true;
+    }
+
+    public void setDebug(boolean isDebug) {
+        this.isDebug = isDebug;
+    }
+
+    public void setRun() {
+        isDebug = false;
+    }
+
+    public List<MessageQueue> getMessageQueue() {
+        return null;
+    }
+
+    private Object getMessage() {
+        return this.getMessageByTag(null);
+    }
+
+    public Object getMessageByTag(String tag) {
+        Object objMsg = null;
+        if (this instanceof RMQNormalProducer) {
+            org.apache.rocketmq.common.message.Message msg = new 
org.apache.rocketmq.common.message.Message(
+                topic, (RandomUtil.getStringByUUID() + "." + new 
Date()).getBytes());
+            objMsg = msg;
+            if (tag != null) {
+                msg.setTags(tag);
+            }
+        }
+        return objMsg;
+    }
+
+    public void send() {
+        Object msg = getMessage();
+        send(msg, null);
+    }
+
+    public void send(Object msg) {
+        send(msg, null);
+    }
+
+    public void send(long msgNum) {
+        for (int i = 0; i < msgNum; i++) {
+            this.send();
+        }
+    }
+
+    public void send(long msgNum, int intervalMills) {
+        for (int i = 0; i < msgNum; i++) {
+            this.send();
+            TestUtil.waitForMonment(intervalMills);
+        }
+    }
+
+    public void send(String tag, int msgSize) {
+        for (int i = 0; i < msgSize; i++) {
+            Object msg = getMessageByTag(tag);
+            send(msg, null);
+        }
+    }
+
+    public void send(String tag, int msgSize, int intervalMills) {
+        for (int i = 0; i < msgSize; i++) {
+            Object msg = getMessageByTag(tag);
+            send(msg, null);
+            TestUtil.waitForMonment(intervalMills);
+        }
+    }
+
+    public void send(List<Object> msgs) {
+        for (Object msg : msgs) {
+            this.send(msg, null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java
new file mode 100644
index 0000000..42d4b62
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQCollector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.clientinterface;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.test.util.RandomUtil;
+import org.apache.rocketmq.test.util.data.collect.DataCollector;
+import org.apache.rocketmq.test.util.data.collect.DataCollectorManager;
+
+public abstract class MQCollector {
+    protected DataCollector msgBodys = null;
+    protected DataCollector originMsgs = null;
+    protected DataCollector errorMsgs = null;
+    protected Map<Object, Object> originMsgIndex = null;
+    protected Collection<Object> msgBodysCopy = null;
+
+    protected DataCollector msgRTs = null;
+
+    public MQCollector() {
+        msgBodys = DataCollectorManager.getInstance()
+            .fetchListDataCollector(RandomUtil.getStringByUUID());
+        originMsgs = DataCollectorManager.getInstance()
+            .fetchListDataCollector(RandomUtil.getStringByUUID());
+        errorMsgs = DataCollectorManager.getInstance()
+            .fetchListDataCollector(RandomUtil.getStringByUUID());
+        originMsgIndex = new ConcurrentHashMap<Object, Object>();
+        msgRTs = DataCollectorManager.getInstance()
+            .fetchListDataCollector(RandomUtil.getStringByUUID());
+    }
+
+    public MQCollector(String originMsgCollector, String msgBodyCollector) {
+        originMsgs = 
DataCollectorManager.getInstance().fetchDataCollector(originMsgCollector);
+        msgBodys = 
DataCollectorManager.getInstance().fetchDataCollector(msgBodyCollector);
+    }
+
+    public Collection<Object> getAllMsgBody() {
+        return msgBodys.getAllData();
+    }
+
+    public Collection<Object> getAllOriginMsg() {
+        return originMsgs.getAllData();
+    }
+
+    public Object getFirstMsg() {
+        return ((List<Object>) originMsgs.getAllData()).get(0);
+    }
+
+    public Collection<Object> getAllUndupMsgBody() {
+        return msgBodys.getAllDataWithoutDuplicate();
+    }
+
+    public Collection<Object> getAllUndupOriginMsg() {
+        return originMsgs.getAllData();
+    }
+
+    public Collection<Object> getSendErrorMsg() {
+        return errorMsgs.getAllData();
+    }
+
+    public Collection<Object> getMsgRTs() {
+        return msgRTs.getAllData();
+    }
+
+    public Map<Object, Object> getOriginMsgIndex() {
+        return originMsgIndex;
+    }
+
+    public Collection<Object> getMsgBodysCopy() {
+        msgBodysCopy = new ArrayList<Object>();
+        msgBodysCopy.addAll(msgBodys.getAllData());
+        return msgBodysCopy;
+    }
+
+    public void clearMsg() {
+        msgBodys.resetData();
+        originMsgs.resetData();
+        errorMsgs.resetData();
+        originMsgIndex.clear();
+        msgRTs.resetData();
+    }
+
+    public void lockCollectors() {
+        msgBodys.lockIncrement();
+        originMsgs.lockIncrement();
+        errorMsgs.lockIncrement();
+        msgRTs.lockIncrement();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java
new file mode 100644
index 0000000..aaa4b27
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQConsumer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.clientinterface;
+
+public interface MQConsumer {
+    void create();
+
+    void start();
+
+    void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQProducer.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQProducer.java 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQProducer.java
new file mode 100644
index 0000000..795457d
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/clientinterface/MQProducer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.clientinterface;
+
+import org.apache.rocketmq.test.sendresult.SendResult;
+
+public interface MQProducer {
+    SendResult send(Object msg, Object arg);
+
+    void setDebug();
+
+    void setRun();
+
+    void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java 
b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
new file mode 100644
index 0000000..b5b3fdd
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.factory;
+
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class ConsumerFactory {
+
+    public static RMQNormalConsumer getRMQNormalConsumer(String nsAddr, String 
consumerGroup,
+        String topic, String subExpression,
+        AbstractListener listner) {
+        RMQNormalConsumer consumer = new RMQNormalConsumer(nsAddr, topic, 
subExpression,
+            consumerGroup, listner);
+        consumer.create();
+        consumer.start();
+        return consumer;
+    }
+
+    public static RMQBroadCastConsumer getRMQBroadCastConsumer(String nsAddr, 
String consumerGroup,
+        String topic, String subExpression,
+        AbstractListener listner) {
+        RMQBroadCastConsumer consumer = new RMQBroadCastConsumer(nsAddr, 
topic, subExpression,
+            consumerGroup, listner);
+        consumer.create();
+        consumer.start();
+        return consumer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/MQMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/factory/MQMessageFactory.java 
b/test/src/main/java/org/apache/rocketmq/test/factory/MQMessageFactory.java
new file mode 100644
index 0000000..f998fcb
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/MQMessageFactory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.factory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.util.RandomUtil;
+
+public class MQMessageFactory {
+    private static Integer index = 0;
+
+    public static List<Object> getRMQMessage(String tag, String topic, int 
msgSize) {
+        List<Object> msgs = new ArrayList<Object>();
+        for (int i = 0; i < msgSize; i++) {
+            msgs.add(new Message(topic, tag, 
RandomUtil.getStringByUUID().getBytes()));
+        }
+
+        return msgs;
+    }
+
+    public static List<Object> getRMQMessage(List<String> tags, String topic, 
int msgSize) {
+        List<Object> msgs = new ArrayList<Object>();
+        for (int i = 0; i < msgSize; i++) {
+            for (String tag : tags) {
+                msgs.add(new Message(topic, tag, 
RandomUtil.getStringByUUID().getBytes()));
+            }
+        }
+        return msgs;
+    }
+
+    public static List<Object> getMessageBody(List<Object> msgs) {
+        List<Object> msgBodys = new ArrayList<Object>();
+        for (Object msg : msgs) {
+            msgBodys.add(new String(((Message) msg).getBody()));
+        }
+
+        return msgBodys;
+    }
+
+    public static Collection<Object> getMessage(Collection<Object>... msgs) {
+        Collection<Object> allMsgs = new ArrayList<Object>();
+        for (Collection<Object> msg : msgs) {
+            allMsgs.addAll(msg);
+        }
+        return allMsgs;
+    }
+
+    public static List<Object> getDelayMsg(String topic, int delayLevel, int 
msgSize) {
+        List<Object> msgs = new ArrayList<Object>();
+        for (int i = 0; i < msgSize; i++) {
+            Message msg = new Message(topic, 
RandomUtil.getStringByUUID().getBytes());
+            msg.setDelayTimeLevel(delayLevel);
+            msgs.add(msg);
+        }
+        return msgs;
+    }
+
+    public static List<Object> getKeyMsg(String topic, String key, int 
msgSize) {
+        List<Object> msgs = new ArrayList<Object>();
+        for (int i = 0; i < msgSize; i++) {
+            Message msg = new Message(topic, null, key, 
RandomUtil.getStringByUUID().getBytes());
+            msgs.add(msg);
+        }
+        return msgs;
+    }
+
+    public static Map<MessageQueue, List<Object>> getMsgByMQ(MessageQueue mq, 
int msgSize) {
+        List<MessageQueue> mqs = new ArrayList<MessageQueue>();
+        mqs.add(mq);
+        return getMsgByMQ(mqs, msgSize);
+    }
+
+    public static Map<MessageQueue, List<Object>> 
getMsgByMQ(List<MessageQueue> mqs, int msgSize) {
+        return getMsgByMQ(mqs, msgSize, null);
+    }
+
+    public static Map<MessageQueue, List<Object>> 
getMsgByMQ(List<MessageQueue> mqs, int msgSize,
+        String tag) {
+        Map<MessageQueue, List<Object>> msgs = new HashMap<MessageQueue, 
List<Object>>();
+        for (MessageQueue mq : mqs) {
+            msgs.put(mq, getMsg(mq.getTopic(), msgSize, tag));
+        }
+        return msgs;
+    }
+
+    public static List<Object> getMsg(String topic, int msgSize) {
+        return getMsg(topic, msgSize, null);
+    }
+
+    public static List<Object> getMsg(String topic, int msgSize, String tag) {
+        List<Object> msgs = new ArrayList<Object>();
+        while (msgSize > 0) {
+            Message msg = new Message(topic, (index++).toString().getBytes());
+            if (tag != null) {
+                msg.setTags(tag);
+            }
+            msgs.add(msg);
+            msgSize--;
+        }
+
+        return msgs;
+    }
+
+    public static List<MessageQueue> getMessageQueues(MessageQueue... mq) {
+        return Arrays.asList(mq);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/MessageFactory.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/factory/MessageFactory.java 
b/test/src/main/java/org/apache/rocketmq/test/factory/MessageFactory.java
new file mode 100644
index 0000000..8f71700
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/MessageFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.factory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.test.util.RandomUtils;
+
+public class MessageFactory {
+
+    public static Message getRandomMessage(String topic) {
+        return getStringMessage(topic, RandomUtils.getStringByUUID());
+    }
+
+    public static Message getStringMessage(String topic, String body) {
+        Message msg = new Message(topic, body.getBytes());
+        return msg;
+    }
+
+    public static Message getStringMessageByTag(String topic, String tags, 
String body) {
+        Message msg = new Message(topic, tags, body.getBytes());
+        return msg;
+    }
+
+    public static Message getRandomMessageByTag(String topic, String tags) {
+        return getStringMessageByTag(topic, tags, 
RandomUtils.getStringByUUID());
+    }
+
+    public static Collection<Message> getRandomMessageList(String topic, int 
size) {
+        List<Message> msgList = new ArrayList<Message>();
+        for (int i = 0; i < size; i++) {
+            msgList.add(getRandomMessage(topic));
+        }
+        return msgList;
+    }
+
+    public static Collection<Message> getRandomMessageListByTag(String topic, 
String tags, int size) {
+        List<Message> msgList = new ArrayList<Message>();
+        for (int i = 0; i < size; i++) {
+            msgList.add(getRandomMessageByTag(topic, tags));
+        }
+        return msgList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java 
b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
new file mode 100644
index 0000000..66767cc
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.factory;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.test.util.RandomUtil;
+
+public class ProducerFactory {
+
+    public static DefaultMQProducer getRMQProducer(String ns) {
+        DefaultMQProducer producer = new 
DefaultMQProducer(RandomUtil.getStringByUUID());
+        producer.setNamesrvAddr(ns);
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+
+        return producer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/SendCallBackFactory.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/factory/SendCallBackFactory.java 
b/test/src/main/java/org/apache/rocketmq/test/factory/SendCallBackFactory.java
new file mode 100644
index 0000000..64764c6
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/factory/SendCallBackFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.factory;
+
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+
+public class SendCallBackFactory {
+    public static SendCallback getSendCallBack() {
+        return new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+            }
+
+            @Override
+            public void onException(Throwable throwable) {
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/factory/TagMessage.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/factory/TagMessage.java 
b/test/src/main/java/org/apache/rocketmq/test/factory/TagMessage.java
new file mode 100644
index 0000000..b7eb6a6
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/TagMessage.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.factory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TagMessage {
+    private List<String> tags = null;
+    private String topic = null;
+    private int msgSize = 0;
+    private Map<String, List<Object>> rmqMsgs = new HashMap<String, 
List<Object>>();
+
+    public TagMessage(String tag, String topic, int msgSize) {
+        String[] tags = {tag};
+        this.tags = Arrays.asList(tags);
+        this.topic = topic;
+        this.msgSize = msgSize;
+
+        init();
+    }
+
+    public TagMessage(String[] tags, String topic, int msgSize) {
+        this(Arrays.asList(tags), topic, msgSize);
+    }
+
+    public TagMessage(List<String> tags, String topic, int msgSize) {
+        this.tags = tags;
+        this.topic = topic;
+        this.msgSize = msgSize;
+
+        init();
+    }
+
+    private void init() {
+        for (String tag : tags) {
+            List<Object> tagMsgs = MQMessageFactory.getRMQMessage(tag, topic, 
msgSize);
+            rmqMsgs.put(tag, tagMsgs);
+        }
+    }
+
+    public List<Object> getMessageByTag(String tag) {
+        if (tags.contains(tag)) {
+            return rmqMsgs.get(tag);
+        } else {
+            return new ArrayList<Object>();
+        }
+    }
+
+    public List<Object> getMixedTagMessages() {
+        List<Object> mixedMsgs = new ArrayList<Object>();
+        for (int i = 0; i < msgSize; i++) {
+            for (String tag : tags) {
+                mixedMsgs.add(rmqMsgs.get(tag).get(i));
+            }
+        }
+
+        return mixedMsgs;
+    }
+
+    public List<Object> getMessageBodyByTag(String tag) {
+        if (tags.contains(tag)) {
+            return MQMessageFactory.getMessageBody(rmqMsgs.get(tag));
+        } else {
+            return new ArrayList<Object>();
+        }
+    }
+
+    public List<Object> getMessageBodyByTag(String... tag) {
+        return this.getMessageBodyByTag(Arrays.asList(tag));
+    }
+
+    public List<Object> getMessageBodyByTag(List<String> tags) {
+        List<Object> msgBodys = new ArrayList<Object>();
+        for (String tag : tags) {
+            msgBodys.addAll(MQMessageFactory.getMessageBody(rmqMsgs.get(tag)));
+        }
+        return msgBodys;
+    }
+
+    public List<Object> getAllTagMessageBody() {
+        List<Object> msgs = new ArrayList<Object>();
+        for (String tag : tags) {
+            msgs.addAll(MQMessageFactory.getMessageBody(rmqMsgs.get(tag)));
+        }
+
+        return msgs;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java 
b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
new file mode 100644
index 0000000..974434a
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.listener;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.listener.MessageListener;
+import org.apache.rocketmq.test.clientinterface.MQCollector;
+import org.apache.rocketmq.test.util.TestUtil;
+
+public class AbstractListener extends MQCollector implements MessageListener {
+    public static Logger logger = Logger.getLogger(AbstractListener.class);
+    protected boolean isDebug = false;
+    protected String listnerName = null;
+    protected Collection<Object> allSendMsgs = null;
+
+    public AbstractListener() {
+        super();
+    }
+
+    public AbstractListener(String listnerName) {
+        super();
+        this.listnerName = listnerName;
+    }
+
+    public AbstractListener(String originMsgCollector, String 
msgBodyCollector) {
+        super(originMsgCollector, msgBodyCollector);
+    }
+
+    public boolean isDebug() {
+        return isDebug;
+    }
+
+    public void setDebug(boolean debug) {
+        isDebug = debug;
+    }
+
+    public void waitForMessageConsume(int timeoutMills) {
+        TestUtil.waitForMonment(timeoutMills);
+    }
+
+    public void stopRecv() {
+        super.lockCollectors();
+    }
+
+    public Collection<Object> waitForMessageConsume(Collection<Object> 
allSendMsgs,
+        int timeoutMills) {
+        this.allSendMsgs = allSendMsgs;
+        List<Object> sendMsgs = new ArrayList<Object>();
+        sendMsgs.addAll(allSendMsgs);
+
+        long curTime = System.currentTimeMillis();
+        while (!sendMsgs.isEmpty()) {
+            Iterator<Object> iter = sendMsgs.iterator();
+            while (iter.hasNext()) {
+                Object msg = iter.next();
+                if (msgBodys.getAllData().contains(msg)) {
+                    iter.remove();
+                }
+            }
+            if (sendMsgs.isEmpty()) {
+                break;
+            } else {
+                if (System.currentTimeMillis() - curTime >= timeoutMills) {
+                    logger.error(String.format("timeout but  [%s]  not recv 
all send messages!",
+                        listnerName));
+                    break;
+                } else {
+                    logger.info(String.format("[%s] still [%s] msg not recv!", 
listnerName,
+                        sendMsgs.size()));
+                    TestUtil.waitForMonment(500);
+                }
+            }
+        }
+
+        return sendMsgs;
+    }
+
+    public void waitForMessageConsume(Map<Object, Object> sendMsgIndex, int 
timeoutMills) {
+        Collection<Object> notRecvMsgs = 
waitForMessageConsume(sendMsgIndex.keySet(), timeoutMills);
+        for (Object object : notRecvMsgs) {
+            logger.info(sendMsgIndex.get(object));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQDelayListner.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQDelayListner.java
 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQDelayListner.java
new file mode 100644
index 0000000..b4a0870
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQDelayListner.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.listener.rmq.concurrent;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.test.listener.AbstractListener;
+import org.apache.rocketmq.test.util.RandomUtil;
+import org.apache.rocketmq.test.util.data.collect.DataCollector;
+import org.apache.rocketmq.test.util.data.collect.DataCollectorManager;
+
+public class RMQDelayListner extends AbstractListener implements 
MessageListenerConcurrently {
+    private DataCollector msgDelayTimes = null;
+
+    public RMQDelayListner() {
+        msgDelayTimes = DataCollectorManager.getInstance()
+            .fetchDataCollector(RandomUtil.getStringByUUID());
+    }
+
+    public Collection<Object> getMsgDelayTimes() {
+        return msgDelayTimes.getAllData();
+    }
+
+    public void resetMsgDelayTimes() {
+        msgDelayTimes.resetData();
+    }
+
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+        ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        long recvTime = System.currentTimeMillis();
+        for (MessageExt msg : msgs) {
+            if (isDebug) {
+                logger.info(listnerName + ":" + msg);
+            }
+
+            msgBodys.addData(new String(msg.getBody()));
+            originMsgs.addData(msg);
+            msgDelayTimes.addData(Math.abs(recvTime - msg.getBornTimestamp()));
+        }
+        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java
 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java
new file mode 100644
index 0000000..0d40881
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListner.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.listener.rmq.concurrent;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class RMQNormalListner extends AbstractListener implements 
MessageListenerConcurrently {
+    private ConsumeConcurrentlyStatus consumeStatus = 
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+    private AtomicInteger msgIndex = new AtomicInteger(0);
+
+    public RMQNormalListner() {
+        super();
+    }
+
+    public RMQNormalListner(String listnerName) {
+        super(listnerName);
+    }
+
+    public RMQNormalListner(ConsumeConcurrentlyStatus consumeStatus) {
+        super();
+        this.consumeStatus = consumeStatus;
+    }
+
+    public RMQNormalListner(String originMsgCollector, String 
msgBodyCollector) {
+        super(originMsgCollector, msgBodyCollector);
+    }
+
+    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+        ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+        for (MessageExt msg : msgs) {
+            msgIndex.getAndIncrement();
+            if (isDebug) {
+                if (listnerName != null && listnerName != "") {
+                    logger.info(listnerName + ":" + msgIndex.get() + ":"
+                        + String.format("msgid:%s broker:%s queueId:%s 
offset:%s",
+                        msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(),
+                        msg.getQueueOffset()));
+                } else {
+                    logger.info(msg);
+                }
+            }
+
+            msgBodys.addData(new String(msg.getBody()));
+            originMsgs.addData(msg);
+            originMsgIndex.put(new String(msg.getBody()), msg);
+        }
+        return consumeStatus;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/listener/rmq/order/RMQOrderListener.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/order/RMQOrderListener.java
 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/order/RMQOrderListener.java
new file mode 100644
index 0000000..91883d8
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/order/RMQOrderListener.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.listener.rmq.order;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class RMQOrderListener extends AbstractListener implements 
MessageListenerOrderly {
+    private Map<String/* brokerId_brokerIp */, Collection<Object>> msgs = new 
ConcurrentHashMap<String, Collection<Object>>();
+
+    public RMQOrderListener() {
+        super();
+    }
+
+    public RMQOrderListener(String listnerName) {
+        super(listnerName);
+    }
+
+    public RMQOrderListener(String originMsgCollector, String 
msgBodyCollector) {
+        super(originMsgCollector, msgBodyCollector);
+    }
+
+    public Collection<Collection<Object>> getMsgs() {
+        return msgs.values();
+    }
+
+    private void putMsg(MessageExt msg) {
+        Collection<Object> msgQueue = null;
+        String key = getKey(msg.getQueueId(), msg.getStoreHost().toString());
+        if (!msgs.containsKey(key)) {
+            msgQueue = new ArrayList<Object>();
+        } else {
+            msgQueue = msgs.get(key);
+        }
+
+        msgQueue.add(new String(msg.getBody()));
+        msgs.put(key, msgQueue);
+    }
+
+    private String getKey(int queueId, String brokerIp) {
+        return String.format("%s_%s", queueId, brokerIp);
+    }
+
+    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
+        ConsumeOrderlyContext context) {
+        for (MessageExt msg : msgs) {
+            if (isDebug) {
+                if (listnerName != null && listnerName != "") {
+                    logger.info(listnerName + ": " + msg);
+                } else {
+                    logger.info(msg);
+                }
+            }
+
+            putMsg(msg);
+            msgBodys.addData(new String(msg.getBody()));
+            originMsgs.addData(msg);
+        }
+
+        return ConsumeOrderlyStatus.SUCCESS;
+    }
+}


Reply via email to