http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/message/MessageQueueMsg.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/message/MessageQueueMsg.java 
b/test/src/main/java/org/apache/rocketmq/test/message/MessageQueueMsg.java
new file mode 100644
index 0000000..38c2b6b
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/message/MessageQueueMsg.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.message;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+
+public class MessageQueueMsg {
+    private Map<MessageQueue, List<Object>> msgsWithMQ = null;
+    private Map<Integer, List<Object>> msgsWithMQId = null;
+    private Collection<Object> msgBodys = null;
+
+    public MessageQueueMsg(List<MessageQueue> mqs, int msgSize) {
+        this(mqs, msgSize, null);
+    }
+
+    public MessageQueueMsg(List<MessageQueue> mqs, int msgSize, String tag) {
+        msgsWithMQ = MQMessageFactory.getMsgByMQ(mqs, msgSize, tag);
+        msgsWithMQId = new HashMap<Integer, List<Object>>();
+        msgBodys = new ArrayList<Object>();
+        init();
+    }
+
+    public Map<MessageQueue, List<Object>> getMsgsWithMQ() {
+        return msgsWithMQ;
+    }
+
+    public Map<Integer, List<Object>> getMsgWithMQId() {
+        return msgsWithMQId;
+    }
+
+    public Collection<Object> getMsgBodys() {
+        return msgBodys;
+    }
+
+    private void init() {
+        for (MessageQueue mq : msgsWithMQ.keySet()) {
+            msgsWithMQId.put(mq.getQueueId(), msgsWithMQ.get(mq));
+            
msgBodys.addAll(MQMessageFactory.getMessageBody(msgsWithMQ.get(mq)));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/sendresult/SendResult.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/sendresult/SendResult.java 
b/test/src/main/java/org/apache/rocketmq/test/sendresult/SendResult.java
new file mode 100644
index 0000000..d53ee7d
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/sendresult/SendResult.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.sendresult;
+
+public class SendResult {
+    private boolean sendResult = false;
+    private String msgId = null;
+    private Exception sendException = null;
+    private String brokerIp = null;
+
+    public String getBrokerIp() {
+        return brokerIp;
+    }
+
+    public void setBrokerIp(String brokerIp) {
+        this.brokerIp = brokerIp;
+    }
+
+    public boolean isSendResult() {
+        return sendResult;
+    }
+
+    public void setSendResult(boolean sendResult) {
+        this.sendResult = sendResult;
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public Exception getSendException() {
+        return sendException;
+    }
+
+    public void setSendException(Exception sendException) {
+        this.sendException = sendException;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("sendstatus:%s msgId:%s", sendResult, msgId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/Condition.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/Condition.java 
b/test/src/main/java/org/apache/rocketmq/test/util/Condition.java
new file mode 100644
index 0000000..3c5f403
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/Condition.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+public interface Condition {
+    boolean meetCondition();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java 
b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
new file mode 100644
index 0000000..8bd93b6
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.DecimalFormat;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class DuplicateMessageInfo<T> {
+
+    public void checkDuplicatedMessageInfo(boolean bPrintLog,
+        List<List<T>> lQueueList) throws IOException {
+        int msgListSize = lQueueList.size();
+        int maxmsgList = 0;
+        Map<T, Integer> msgIdMap = new HashMap<T, Integer>();
+        Map<Integer, Integer> dupMsgMap = new HashMap<Integer, Integer>();
+
+        for (int i = 0; i < msgListSize; i++) {
+            if (maxmsgList < lQueueList.get(i).size())
+                maxmsgList = lQueueList.get(i).size();
+        }
+
+        List<StringBuilder> strBQueue = new LinkedList<StringBuilder>();
+        for (int i = 0; i < msgListSize; i++)
+            strBQueue.add(new StringBuilder());
+
+        for (int msgListIndex = 0; msgListIndex < maxmsgList; msgListIndex++) {
+            for (int msgQueueListIndex = 0; msgQueueListIndex < msgListSize; 
msgQueueListIndex++) {
+                if (msgListIndex < lQueueList.get(msgQueueListIndex).size()) {
+                    if 
(msgIdMap.containsKey(lQueueList.get(msgQueueListIndex).get(msgListIndex))) {
+                        if (dupMsgMap.containsKey(msgQueueListIndex)) {
+                            int dupMsgCount = dupMsgMap.get(msgQueueListIndex);
+                            dupMsgCount++;
+                            dupMsgMap.remove(msgQueueListIndex);
+                            dupMsgMap.put(msgQueueListIndex, dupMsgCount);
+                        } else {
+                            dupMsgMap.put(msgQueueListIndex, 1);
+                        }
+
+                        strBQueue.get(msgQueueListIndex).append("" + 
msgQueueListIndex + "\t" +
+                            
msgIdMap.get(lQueueList.get(msgQueueListIndex).get(msgListIndex)) + "\t"
+                            + 
lQueueList.get(msgQueueListIndex).get(msgListIndex) + "\r\n");
+                    } else {
+                        
msgIdMap.put(lQueueList.get(msgQueueListIndex).get(msgListIndex), 
msgQueueListIndex);
+                    }
+                }
+            }
+        }
+
+        int msgTotalNum = getMsgTotalNumber(lQueueList);
+        int msgTotalDupNum = getDuplicateMsgNum(dupMsgMap);
+        int msgNoDupNum = msgTotalNum - msgTotalDupNum;
+        float msgDupRate = ((float) msgTotalDupNum / (float) msgTotalNum) * 
100.0f;
+        StringBuilder strBuilder = new StringBuilder();
+
+        strBuilder.append("msgTotalNum:" + msgTotalNum + "\r\n");
+        strBuilder.append("msgTotalDupNum:" + msgTotalDupNum + "\r\n");
+        strBuilder.append("msgNoDupNum:" + msgNoDupNum + "\r\n");
+        strBuilder.append("msgDupRate" + getFloatNumString(msgDupRate) + 
"%\r\n");
+
+        strBuilder.append("queue\tmsg(dupNum/dupRate)\tdupRate\r\n");
+        for (int i = 0; i < dupMsgMap.size(); i++) {
+            int msgDupNum = dupMsgMap.get(i);
+            int msgNum = lQueueList.get(i).size();
+            float msgQueueDupRate = ((float) msgDupNum / (float) 
msgTotalDupNum) * 100.0f;
+            float msgQueueInnerDupRate = ((float) msgDupNum / (float) msgNum) 
* 100.0f;
+
+            strBuilder.append(i + "\t" + msgDupNum + "/" + 
getFloatNumString(msgQueueDupRate) + "%" + "\t\t" +
+                getFloatNumString(msgQueueInnerDupRate) + "%\r\n");
+        }
+
+        System.out.print(strBuilder.toString());
+        String titleString = "queue\tdupQueue\tdupMsg\r\n";
+        System.out.print(titleString);
+
+        for (int i = 0; i < msgListSize; i++)
+            System.out.print(strBQueue.get(i).toString());
+
+        if (bPrintLog) {
+            String logFileNameStr = "D:" + File.separator + 
"checkDuplicatedMessageInfo.txt";
+            File logFileNameFile = new File(logFileNameStr);
+            OutputStream out = new FileOutputStream(logFileNameFile, true);
+
+            String strToWrite;
+            byte[] byteToWrite;
+            strToWrite = strBuilder.toString() + titleString;
+            for (int i = 0; i < msgListSize; i++)
+                strToWrite += strBQueue.get(i).toString() + "\r\n";
+
+            byteToWrite = strToWrite.getBytes();
+            out.write(byteToWrite);
+            out.close();
+        }
+    }
+
+    private int getMsgTotalNumber(List<List<T>> lQueueList) {
+        int msgTotalNum = 0;
+        for (int i = 0; i < lQueueList.size(); i++) {
+            msgTotalNum += lQueueList.get(i).size();
+        }
+        return msgTotalNum;
+    }
+
+    private int getDuplicateMsgNum(Map<Integer, Integer> msgDupMap) {
+        int msgDupNum = 0;
+        for (int i = 0; i < msgDupMap.size(); i++) {
+            msgDupNum += msgDupMap.get(i);
+        }
+        return msgDupNum;
+    }
+
+    private String getFloatNumString(float fNum) {
+        DecimalFormat dcmFmt = new DecimalFormat("0.00");
+        return dcmFmt.format(fNum);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/FileUtil.java 
b/test/src/main/java/org/apache/rocketmq/test/util/FileUtil.java
new file mode 100644
index 0000000..44db782
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/FileUtil.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+
+public class FileUtil {
+    private static String lineSeperator = System.getProperty("line.separator");
+
+    private String filePath = "";
+    private String fileName = "";
+
+    public FileUtil(String filePath, String fileName) {
+        this.filePath = filePath;
+        this.fileName = fileName;
+    }
+
+    public static void main(String args[]) {
+        String filePath = FileUtil.class.getResource("/").getPath();
+        String fileName = "test.txt";
+        FileUtil fileUtil = new FileUtil(filePath, fileName);
+        Properties properties = new Properties();
+        properties.put("xx", "yy");
+        properties.put("yy", "xx");
+        fileUtil.writeProperties(properties);
+    }
+
+    public void deleteFile() {
+        File file = new File(filePath + File.separator + fileName);
+        if (file.exists()) {
+            file.delete();
+        }
+    }
+
+    public void appendFile(String content) {
+        File file = openFile();
+        String newContent = lineSeperator + content;
+        writeFile(file, newContent, true);
+    }
+
+    public void coverFile(String content) {
+        File file = openFile();
+        writeFile(file, content, false);
+    }
+
+    public void writeProperties(Properties properties) {
+        String content = getPropertiesAsString(properties);
+        this.coverFile(content);
+    }
+
+    private String getPropertiesAsString(Properties properties) {
+        StringBuilder sb = new StringBuilder();
+        for (Object key : properties.keySet()) {
+            sb.append(key).append("=").append(properties.getProperty((String) 
key))
+                .append(lineSeperator);
+        }
+        return sb.toString();
+    }
+
+    private void writeFile(File file, String content, boolean append) {
+        FileWriter writer = null;
+        try {
+            writer = new FileWriter(file.getAbsoluteFile(), append);
+            writer.write(content);
+            writer.flush();
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            if (writer != null) {
+                try {
+                    writer.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    private File openFile() {
+        File file = new File(filePath + File.separator + fileName);
+        if (!file.exists()) {
+            try {
+                file.createNewFile();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+        return file;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java 
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
new file mode 100644
index 0000000..c3e0572
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdmin.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.util.HashMap;
+import java.util.Set;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+public class MQAdmin {
+    private static Logger log = Logger.getLogger(MQAdmin.class);
+
+    public static boolean createTopic(String nameSrvAddr, String clusterName, 
String topic,
+        int queueNum) {
+        int defaultWaitTime = 5;
+        return createTopic(nameSrvAddr, clusterName, topic, queueNum, 
defaultWaitTime);
+    }
+
+    public static boolean createTopic(String nameSrvAddr, String clusterName, 
String topic,
+        int queueNum, int waitTimeSec) {
+        boolean createResult = false;
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(nameSrvAddr);
+        try {
+            mqAdminExt.start();
+            mqAdminExt.createTopic(clusterName, topic, queueNum);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        long startTime = System.currentTimeMillis();
+        while (!createResult) {
+            createResult = checkTopicExist(mqAdminExt, topic);
+            if (System.currentTimeMillis() - startTime < waitTimeSec * 1000) {
+                TestUtils.waitForMonment(100);
+            } else {
+                log.error(String.format("timeout,but create topic[%s] 
failed!", topic));
+                break;
+            }
+        }
+
+        mqAdminExt.shutdown();
+        return createResult;
+    }
+
+    private static boolean checkTopicExist(DefaultMQAdminExt mqAdminExt, 
String topic) {
+        boolean createResult = false;
+        try {
+            TopicStatsTable topicInfo = mqAdminExt.examineTopicStats(topic);
+            createResult = !topicInfo.getOffsetTable().isEmpty();
+        } catch (Exception e) {
+        }
+
+        return createResult;
+    }
+
+    public static boolean createSub(String nameSrvAddr, String clusterName, 
String consumerId) {
+        boolean createResult = true;
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(nameSrvAddr);
+        SubscriptionGroupConfig config = new SubscriptionGroupConfig();
+        config.setGroupName(consumerId);
+        try {
+            mqAdminExt.start();
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
+                clusterName);
+            for (String addr : masterSet) {
+                try {
+                    mqAdminExt.createAndUpdateSubscriptionGroupConfig(addr, 
config);
+                    log.info(String.format("create subscription group %s to %s 
success.\n", consumerId,
+                        addr));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    Thread.sleep(1000 * 1);
+                }
+            }
+        } catch (Exception e) {
+            createResult = false;
+            e.printStackTrace();
+        }
+        mqAdminExt.shutdown();
+        return createResult;
+    }
+
+    public static ClusterInfo getCluster(String nameSrvAddr) {
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(nameSrvAddr);
+        ClusterInfo clusterInfo = null;
+        try {
+            mqAdminExt.start();
+            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        mqAdminExt.shutdown();
+        return clusterInfo;
+    }
+
+    public static boolean isBrokerExist(String ns, String ip) {
+        ClusterInfo clusterInfo = getCluster(ns);
+        if (clusterInfo == null) {
+            return false;
+        } else {
+            HashMap<String, BrokerData> brokers = 
clusterInfo.getBrokerAddrTable();
+            for (String brokerName : brokers.keySet()) {
+                HashMap<Long, String> brokerIps = 
brokers.get(brokerName).getBrokerAddrs();
+                for (long brokerId : brokerIps.keySet()) {
+                    if (brokerIps.get(brokerId).contains(ip))
+                        return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+    public void getSubConnection(String nameSrvAddr, String clusterName, 
String consumerId) {
+        boolean createResult = true;
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.setNamesrvAddr(nameSrvAddr);
+        SubscriptionGroupConfig config = new SubscriptionGroupConfig();
+        config.setGroupName(consumerId);
+        try {
+            mqAdminExt.start();
+            Set<String> masterSet = 
CommandUtil.fetchMasterAddrByClusterName(mqAdminExt,
+                clusterName);
+            for (String addr : masterSet) {
+                try {
+
+                    System.out.printf("create subscription group %s to %s 
success.\n", consumerId,
+                        addr);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    Thread.sleep(1000 * 1);
+                }
+            }
+        } catch (Exception e) {
+            createResult = false;
+            e.printStackTrace();
+        }
+        mqAdminExt.shutdown();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/MQRandomUtils.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/MQRandomUtils.java 
b/test/src/main/java/org/apache/rocketmq/test/util/MQRandomUtils.java
new file mode 100644
index 0000000..1d82445
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQRandomUtils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+public class MQRandomUtils {
+    public static String getRandomTopic() {
+        return RandomUtils.getStringByUUID();
+    }
+
+    public static String getRandomConsumerGroup() {
+        return RandomUtils.getStringByUUID();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/MQWait.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/MQWait.java 
b/test/src/main/java/org/apache/rocketmq/test/util/MQWait.java
new file mode 100644
index 0000000..6edeeca
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQWait.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class MQWait {
+    private static Logger logger = Logger.getLogger(MQWait.class);
+
+    public static boolean waitConsumeAll(int timeoutMills, Collection<Object> 
allSendMsgs,
+        AbstractListener... listeners) {
+        boolean recvAll = false;
+        long startTime = System.currentTimeMillis();
+        Collection<Object> noDupMsgs = new ArrayList<Object>();
+        while (!recvAll) {
+            if ((System.currentTimeMillis() - startTime) < timeoutMills) {
+                noDupMsgs.clear();
+                try {
+                    for (AbstractListener listener : listeners) {
+                        Collection<Object> recvMsgs = Collections
+                            
.synchronizedCollection(listener.getAllUndupMsgBody());
+                        
noDupMsgs.addAll(VerifyUtils.getFilterdMessage(allSendMsgs, recvMsgs));
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+
+                try {
+                    assertThat(noDupMsgs).containsAllIn(allSendMsgs);
+                    recvAll = true;
+                    break;
+                } catch (Throwable e) {
+                }
+                TestUtil.waitForMonment(500);
+            } else {
+                logger.error(String.format(
+                    "timeout but still not receive all 
messages,expectSize[%s],realSize[%s]",
+                    allSendMsgs.size(), noDupMsgs.size()));
+                break;
+            }
+        }
+
+        return recvAll;
+    }
+
+    public static void setCondition(Condition condition, int waitTimeMills, 
int intervalMills) {
+        long startTime = System.currentTimeMillis();
+        while (!condition.meetCondition()) {
+            if (System.currentTimeMillis() - startTime > waitTimeMills) {
+                logger.error("time out,but contidion still not meet!");
+                break;
+            } else {
+                TestUtil.waitForMonment(intervalMills);
+            }
+        }
+    }
+
+    public static void main(String args[]) {
+
+        long start = System.currentTimeMillis();
+        MQWait.setCondition(new Condition() {
+            int i = 0;
+
+            public boolean meetCondition() {
+                i++;
+                return i == 100;
+            }
+        }, 10 * 1000, 200);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/RandomUtil.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/RandomUtil.java 
b/test/src/main/java/org/apache/rocketmq/test/util/RandomUtil.java
new file mode 100644
index 0000000..1c2bdac
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/RandomUtil.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public final class RandomUtil {
+
+    private static final int UNICODE_START = '\u4E00';
+    private static final int UNICODE_END = '\u9FA0';
+    private static Random rd = new Random();
+
+    private RandomUtil() {
+
+    }
+
+    public static long getLong() {
+        return rd.nextLong();
+    }
+
+    public static long getLongMoreThanZero() {
+        long res = rd.nextLong();
+        while (res <= 0) {
+            res = rd.nextLong();
+        }
+        return res;
+    }
+
+    public static long getLongLessThan(long n) {
+        long res = rd.nextLong();
+        return res % n;
+    }
+
+    public static long getLongMoreThanZeroLessThan(long n) {
+        long res = getLongLessThan(n);
+        while (res <= 0) {
+            res = getLongLessThan(n);
+        }
+        return res;
+    }
+
+    public static long getLongBetween(long n, long m) {
+        if (m <= n) {
+            return n;
+        }
+        long res = getLongMoreThanZero();
+        return n + res % (m - n);
+    }
+
+    public static int getInteger() {
+        return rd.nextInt();
+    }
+
+    public static int getIntegerMoreThanZero() {
+        int res = rd.nextInt();
+        while (res <= 0) {
+            res = rd.nextInt();
+        }
+        return res;
+    }
+
+    public static int getIntegerLessThan(int n) {
+        int res = rd.nextInt();
+        return res % n;
+    }
+
+    public static int getIntegerMoreThanZeroLessThan(int n) {
+        int res = rd.nextInt(n);
+        while (res == 0) {
+            res = rd.nextInt(n);
+        }
+        return res;
+    }
+
+    public static int getIntegerBetween(int n, int m)// 
m��ֵ����Ϊ���أ�
+    {
+        if (m == n) {
+            return n;
+        }
+        int res = getIntegerMoreThanZero();
+        return n + res % (m - n);
+    }
+
+    private static char getChar(int arg[]) {
+        int size = arg.length;
+        int c = rd.nextInt(size / 2);
+        c = c * 2;
+        return (char) (getIntegerBetween(arg[c], arg[c + 1]));
+    }
+
+    private static String getString(int n, int arg[]) {
+        StringBuilder res = new StringBuilder();
+        for (int i = 0; i < n; i++) {
+            res.append(getChar(arg));
+        }
+        return res.toString();
+    }
+
+    public static String getStringWithCharacter(int n) {
+        int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1};
+        return getString(n, arg);
+    }
+
+    public static String getStringWithNumber(int n) {
+        int arg[] = new int[] {'0', '9' + 1};
+        return getString(n, arg);
+    }
+
+    public static String getStringWithNumAndCha(int n) {
+        int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1, '0', '9' + 1};
+        return getString(n, arg);
+    }
+
+    public static String getStringShortenThan(int n) {
+        int len = getIntegerMoreThanZeroLessThan(n);
+        return getStringWithCharacter(len);
+    }
+
+    public static String getStringWithNumAndChaShortenThan(int n) {
+        int len = getIntegerMoreThanZeroLessThan(n);
+        return getStringWithNumAndCha(len);
+    }
+
+    public static String getStringBetween(int n, int m) {
+        int len = getIntegerBetween(n, m);
+        return getStringWithCharacter(len);
+    }
+
+    public static String getStringWithNumAndChaBetween(int n, int m) {
+        int len = getIntegerBetween(n, m);
+        return getStringWithNumAndCha(len);
+    }
+
+    public static String getStringWithPrefix(int n, String prefix) {
+        int len = prefix.length();
+        if (n <= len)
+            return prefix;
+        else {
+            len = n - len;
+            StringBuilder res = new StringBuilder(prefix);
+            res.append(getStringWithCharacter(len));
+            return res.toString();
+        }
+    }
+
+    public static String getStringWithSuffix(int n, String suffix) {
+
+        int len = suffix.length();
+        if (n <= len)
+            return suffix;
+        else {
+            len = n - len;
+            StringBuilder res = new StringBuilder();
+            res.append(getStringWithCharacter(len));
+            res.append(suffix);
+            return res.toString();
+        }
+    }
+
+    public static String getStringWithBoth(int n, String prefix, String 
suffix) {
+        int len = prefix.length() + suffix.length();
+        StringBuilder res = new StringBuilder(prefix);
+        if (n <= len)
+            return res.append(suffix).toString();
+        else {
+            len = n - len;
+            res.append(getStringWithCharacter(len));
+            res.append(suffix);
+            return res.toString();
+        }
+    }
+
+    public static String getCheseWordWithPrifix(int n, String prefix) {
+        int len = prefix.length();
+        if (n <= len)
+            return prefix;
+        else {
+            len = n - len;
+            StringBuilder res = new StringBuilder(prefix);
+            res.append(getCheseWord(len));
+            return res.toString();
+        }
+    }
+
+    public static String getCheseWordWithSuffix(int n, String suffix) {
+
+        int len = suffix.length();
+        if (n <= len)
+            return suffix;
+        else {
+            len = n - len;
+            StringBuilder res = new StringBuilder();
+            res.append(getCheseWord(len));
+            res.append(suffix);
+            return res.toString();
+        }
+    }
+
+    public static String getCheseWordWithBoth(int n, String prefix, String 
suffix) {
+        int len = prefix.length() + suffix.length();
+        StringBuilder res = new StringBuilder(prefix);
+        if (n <= len)
+            return res.append(suffix).toString();
+        else {
+            len = n - len;
+            res.append(getCheseWord(len));
+            res.append(suffix);
+            return res.toString();
+        }
+    }
+
+    public static String getCheseWord(int len) {
+        StringBuilder res = new StringBuilder();
+        for (int i = 0; i < len; i++) {
+            char str = getCheseChar();
+            res.append(str);
+        }
+        return res.toString();
+    }
+
+    private static char getCheseChar() {
+        return (char) (UNICODE_START + rd.nextInt(UNICODE_END - 
UNICODE_START));
+    }
+
+    public static boolean getBoolean() {
+        return getIntegerMoreThanZeroLessThan(3) == 1;
+    }
+
+    public static String getStringByUUID() {
+        return UUID.randomUUID().toString();
+    }
+
+    public static int[] getRandomArray(int min, int max, int n) {
+        int len = max - min + 1;
+
+        if (max < min || n > len) {
+            return null;
+        }
+
+        int[] source = new int[len];
+        for (int i = min; i < min + len; i++) {
+            source[i - min] = i;
+        }
+
+        int[] result = new int[n];
+        Random rd = new Random();
+        int index = 0;
+        for (int i = 0; i < result.length; i++) {
+            index = Math.abs(rd.nextInt() % len--);
+            result[i] = source[index];
+            source[index] = source[len];
+        }
+        return result;
+    }
+
+    public static Collection<Integer> getRandomCollection(int min, int max, 
int n) {
+        Set<Integer> res = new HashSet<Integer>();
+        int mx = max;
+        int mn = min;
+        if (n == (max + 1 - min)) {
+            for (int i = 1; i <= n; i++) {
+                res.add(i);
+            }
+            return res;
+        }
+        for (int i = 0; i < n; i++) {
+            int v = getIntegerBetween(mn, mx);
+            if (v == mx) {
+                mx--;
+            }
+            if (v == mn) {
+                mn++;
+            }
+            while (res.contains(v)) {
+                v = getIntegerBetween(mn, mx);
+                if (v == mx) {
+                    mx = v;
+                }
+                if (v == mn) {
+                    mn = v;
+                }
+            }
+            res.add(v);
+        }
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/RandomUtils.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/RandomUtils.java 
b/test/src/main/java/org/apache/rocketmq/test/util/RandomUtils.java
new file mode 100644
index 0000000..9eca28b
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/RandomUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.util.Random;
+import java.util.UUID;
+
+public class RandomUtils {
+    private static final int UNICODE_START = '\u4E00';
+    private static final int UNICODE_END = '\u9FA0';
+    private static Random rd = new Random();
+
+    private RandomUtils() {
+
+    }
+
+    public static String getStringByUUID() {
+        return UUID.randomUUID().toString();
+    }
+
+    public static String getCheseWord(int len) {
+        StringBuilder res = new StringBuilder();
+
+        for (int i = 0; i < len; ++i) {
+            char str = getCheseChar();
+            res.append(str);
+        }
+
+        return res.toString();
+    }
+
+    public static String getStringWithNumber(int n) {
+        int arg[] = new int[] {'0', '9' + 1};
+        return getString(n, arg);
+    }
+
+    public static String getStringWithCharacter(int n) {
+        int arg[] = new int[] {'a', 'z' + 1, 'A', 'Z' + 1};
+        return getString(n, arg);
+    }
+
+    private static String getString(int n, int arg[]) {
+        StringBuilder res = new StringBuilder();
+        for (int i = 0; i < n; i++) {
+            res.append(getChar(arg));
+        }
+        return res.toString();
+    }
+
+    private static char getChar(int arg[]) {
+        int size = arg.length;
+        int c = rd.nextInt(size / 2);
+        c = c * 2;
+        return (char) (getIntegerBetween(arg[c], arg[c + 1]));
+    }
+
+    public static int getIntegerBetween(int n, int m) {
+        if (m == n) {
+            return n;
+        }
+        int res = getIntegerMoreThanZero();
+        return n + res % (m - n);
+    }
+
+    public static int getIntegerMoreThanZero() {
+        int res = rd.nextInt();
+        while (res <= 0) {
+            res = rd.nextInt();
+        }
+        return res;
+    }
+
+    private static char getCheseChar() {
+        return (char) (UNICODE_START + rd.nextInt(UNICODE_END - 
UNICODE_START));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java 
b/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java
new file mode 100644
index 0000000..591b3b7
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtil.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public final class TestUtil {
+
+    private TestUtil() {
+    }
+
+    public static Long parseStringToLong(String s, Long defval) {
+        Long val = defval;
+        try {
+            val = Long.parseLong(s);
+        } catch (NumberFormatException e) {
+            val = defval;
+        }
+        return val;
+    }
+
+    public static Integer parseStringToInteger(String s, Integer defval) {
+        Integer val = defval;
+        try {
+            val = Integer.parseInt(s);
+        } catch (NumberFormatException e) {
+            val = defval;
+        }
+        return val;
+    }
+
+    public static String addQuoteToParamater(String param) {
+        StringBuilder sb = new StringBuilder("'");
+        sb.append(param).append("'");
+        return sb.toString();
+    }
+
+    public static void waitForMonment(long time) {
+        try {
+            Thread.sleep(time);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static void waitForSeconds(long time) {
+        try {
+            TimeUnit.SECONDS.sleep(time);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static void waitForMinutes(long time) {
+        try {
+            TimeUnit.MINUTES.sleep(time);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static void waitForInputQuit() {
+        waitForInput("quit");
+    }
+
+    public static void waitForInput(String keyWord) {
+        waitForInput(keyWord,
+            String.format("The thread will wait until you input stop 
command[%s]:", keyWord));
+    }
+
+    public static void waitForInput(String keyWord, String info) {
+        try {
+            byte[] b = new byte[1024];
+            int n = System.in.read(b);
+            String s = new String(b, 0, n - 1).replace("\r", "").replace("\n", 
"");
+            while (!s.equals(keyWord)) {
+                n = System.in.read(b);
+                s = new String(b, 0, n - 1);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static <K, V extends Comparable<? super V>> Map<K, V> 
sortByValue(Map<K, V> map) {
+        List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, 
V>>(map.entrySet());
+        Collections.sort(list, new Comparator<Map.Entry<K, V>>() {
+            public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) {
+                return (o1.getValue()).compareTo(o2.getValue());
+            }
+        });
+
+        Map<K, V> result = new LinkedHashMap<K, V>();
+        for (Map.Entry<K, V> entry : list) {
+            result.put(entry.getKey(), entry.getValue());
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java 
b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java
new file mode 100644
index 0000000..6326d46
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/TestUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.util.concurrent.TimeUnit;
+
+public class TestUtils {
+    public static void waitForMonment(long time) {
+        try {
+            Thread.sleep(time);
+        } catch (InterruptedException var3) {
+            var3.printStackTrace();
+        }
+
+    }
+
+    public static void waitForSeconds(long time) {
+        try {
+            TimeUnit.SECONDS.sleep(time);
+        } catch (InterruptedException var3) {
+            var3.printStackTrace();
+        }
+
+    }
+
+    public static void waitForMinutes(long time) {
+        try {
+            TimeUnit.MINUTES.sleep(time);
+        } catch (InterruptedException var3) {
+            var3.printStackTrace();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/VerifyUtils.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/util/VerifyUtils.java 
b/test/src/main/java/org/apache/rocketmq/test/util/VerifyUtils.java
new file mode 100644
index 0000000..965d2ee
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/VerifyUtils.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class VerifyUtils {
+    private static Logger logger = Logger.getLogger(VerifyUtils.class);
+
+    public static int verify(Collection<Object> sendMsgs, Collection<Object> 
recvMsgs) {
+        int miss = 0;
+        for (Object msg : sendMsgs) {
+            if (!recvMsgs.contains(msg)) {
+                miss++;
+            }
+        }
+
+        return miss;
+    }
+
+    public static Collection<Object> getFilterdMessage(Collection<Object> 
sendMsgs,
+        Collection<Object> recvMsgs) {
+        Collection<Object> recvMsgsSync = 
Collections.synchronizedCollection(recvMsgs);
+        Collection<Object> filterdMsgs = new ArrayList<Object>();
+        int filterNum = 0;
+        for (Object msg : recvMsgsSync) {
+            if (sendMsgs.contains(msg)) {
+                filterdMsgs.add(msg);
+            } else {
+                filterNum++;
+            }
+        }
+
+        logger.info(String.format("[%s] messages is filterd!", filterNum));
+        return filterdMsgs;
+    }
+
+    public static int verifyUserProperty(Collection<Object> sendMsgs, 
Collection<Object> recvMsgs) {
+        return 0;
+    }
+
+    public static void verifyMessageQueueId(int expectId, Collection<Object> 
msgs) {
+        for (Object msg : msgs) {
+            MessageExt msgEx = (MessageExt) msg;
+            assert expectId == msgEx.getQueueId();
+        }
+    }
+
+    public static boolean verifyBalance(int msgSize, float error, int... 
recvSize) {
+        boolean balance = true;
+        int evenSize = msgSize / recvSize.length;
+        for (int size : recvSize) {
+            if (Math.abs(size - evenSize) > error * evenSize) {
+                balance = false;
+                break;
+            }
+        }
+        return balance;
+    }
+
+    public static boolean verifyBalance(int msgSize, int... recvSize) {
+        return verifyBalance(msgSize, 0.1f, recvSize);
+    }
+
+    public static boolean verifyDelay(long delayTimeMills, Collection<Object> 
recvMsgTimes,
+        int errorMills) {
+        boolean delay = true;
+        for (Object timeObj : recvMsgTimes) {
+            long time = (Long) timeObj;
+            if (Math.abs(time - delayTimeMills) > errorMills) {
+                delay = false;
+                logger.info(String.format("delay error:%s", Math.abs(time - 
delayTimeMills)));
+            }
+        }
+        return delay;
+    }
+
+    public static boolean verifyDelay(long delayTimeMills, Collection<Object> 
recvMsgTimes) {
+        int errorMills = 500;
+        return verifyDelay(delayTimeMills, recvMsgTimes, errorMills);
+    }
+
+    public static boolean verifyOrder(Collection<Collection<Object>> 
queueMsgs) {
+        for (Collection<Object> msgs : queueMsgs) {
+            if (!verifyOrderMsg(msgs)) {
+                return false;
+            }
+        }
+        return true;
+
+    }
+
+    public static boolean verifyOrderMsg(Collection<Object> msgs) {
+        int min = Integer.MIN_VALUE;
+        int curr;
+        if (msgs.size() == 0 || msgs.size() == 1) {
+            return true;
+        } else {
+            for (Object msg : msgs) {
+                curr = Integer.parseInt((String) msg);
+                if (curr < min) {
+                    return false;
+                } else {
+                    min = curr;
+                }
+            }
+        }
+        return true;
+    }
+
+    public static boolean verifyRT(Collection<Object> rts, long maxRTMills) {
+        boolean rtExpect = true;
+        for (Object obj : rts) {
+            long rt = (Long) obj;
+            if (rt > maxRTMills) {
+                rtExpect = false;
+                logger.info(String.format("%s greater thran maxtRT:%s!", rt, 
maxRTMills));
+
+            }
+        }
+        return rtExpect;
+    }
+
+    public static void main(String args[]) {
+        verifyBalance(400, 0.1f, 230, 190);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollector.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollector.java
 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollector.java
new file mode 100644
index 0000000..cbbc8a5
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollector.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util.data.collect;
+
+import java.util.Collection;
+
+public interface DataCollector {
+
+    void resetData();
+
+    Collection<Object> getAllData();
+
+    Collection<Object> getAllDataWithoutDuplicate();
+
+    void addData(Object data);
+
+    long getDataSizeWithoutDuplicate();
+
+    long getDataSize();
+
+    boolean isRepeatedData(Object data);
+
+    int getRepeatedTimeForData(Object data);
+
+    void removeData(Object data);
+
+    void lockIncrement();
+
+    void unlockIncrement();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollectorManager.java
 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollectorManager.java
new file mode 100644
index 0000000..47f4d81
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataCollectorManager.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util.data.collect;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.test.util.data.collect.impl.ListDataCollectorImpl;
+import org.apache.rocketmq.test.util.data.collect.impl.MapDataCollectorImpl;
+
+public final class DataCollectorManager {
+    private static DataCollectorManager instance = new DataCollectorManager();
+    private Map<String, DataCollector> collectMap = new HashMap<String, 
DataCollector>();
+    private Object lock = new Object();
+
+    private DataCollectorManager() {
+    }
+
+    public static DataCollectorManager getInstance() {
+        return instance;
+    }
+
+    public DataCollector fetchDataCollector(String key) {
+        String realKey = key;
+        if (!collectMap.containsKey(realKey)) {
+            synchronized (lock) {
+                if (!collectMap.containsKey(realKey)) {
+                    DataCollector collect = (DataCollector) new 
MapDataCollectorImpl();
+                    collectMap.put(realKey, collect);
+                }
+            }
+        }
+        return collectMap.get(realKey);
+    }
+
+    public DataCollector fetchMapDataCollector(String key) {
+        String realKey = key;
+        if (!collectMap.containsKey(realKey)
+            || collectMap.get(realKey) instanceof ListDataCollectorImpl) {
+            synchronized (lock) {
+                if (!collectMap.containsKey(realKey)
+                    || collectMap.get(realKey) instanceof 
ListDataCollectorImpl) {
+                    DataCollector collect = null;
+                    if (collectMap.containsKey(realKey)) {
+                        DataCollector src = collectMap.get(realKey);
+                        collect = new MapDataCollectorImpl(src.getAllData());
+                    } else {
+                        collect = new MapDataCollectorImpl();
+                    }
+                    collectMap.put(realKey, collect);
+
+                }
+            }
+        }
+        return collectMap.get(realKey);
+    }
+
+    public DataCollector fetchListDataCollector(String key) {
+        String realKey = key;
+        if (!collectMap.containsKey(realKey)
+            || collectMap.get(realKey) instanceof MapDataCollectorImpl) {
+            synchronized (lock) {
+                if (!collectMap.containsKey(realKey)
+                    || collectMap.get(realKey) instanceof 
MapDataCollectorImpl) {
+                    DataCollector collect = null;
+                    if (collectMap.containsKey(realKey)) {
+                        DataCollector src = collectMap.get(realKey);
+                        collect = new ListDataCollectorImpl(src.getAllData());
+                    } else {
+                        collect = new ListDataCollectorImpl();
+                    }
+                    collectMap.put(realKey, collect);
+                }
+            }
+        }
+        return collectMap.get(realKey);
+    }
+
+    public void resetDataCollect(String key) {
+        if (collectMap.containsKey(key)) {
+            collectMap.get(key).resetData();
+        }
+    }
+
+    public void resetAll() {
+        for (Map.Entry<String, DataCollector> entry : collectMap.entrySet()) {
+            entry.getValue().resetData();
+        }
+    }
+
+    public void removeDataCollect(String key) {
+        collectMap.remove(key);
+    }
+
+    public void removeAll() {
+        collectMap.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataFilter.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataFilter.java 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataFilter.java
new file mode 100644
index 0000000..b01adc5
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/DataFilter.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util.data.collect;
+
+public interface DataFilter {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java
 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java
new file mode 100644
index 0000000..82ab461
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util.data.collect.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.rocketmq.test.util.data.collect.DataCollector;
+
+public class ListDataCollectorImpl implements DataCollector {
+
+    private List<Object> datas = new ArrayList<Object>();
+    private boolean lock = false;
+
+    public ListDataCollectorImpl() {
+
+    }
+
+    public ListDataCollectorImpl(Collection<Object> datas) {
+        for (Object data : datas) {
+            addData(data);
+        }
+    }
+
+    public Collection<Object> getAllData() {
+        return datas;
+    }
+
+    public void resetData() {
+        datas.clear();
+        unlockIncrement();
+    }
+
+    public long getDataSizeWithoutDuplicate() {
+        return getAllDataWithoutDuplicate().size();
+    }
+
+    public synchronized void addData(Object data) {
+        if (lock) {
+            return;
+        }
+        datas.add(data);
+    }
+
+    public long getDataSize() {
+        return datas.size();
+    }
+
+    public boolean isRepeatedData(Object data) {
+        return Collections.frequency(datas, data) == 1;
+    }
+
+    public Collection<Object> getAllDataWithoutDuplicate() {
+        return new HashSet<Object>(datas);
+    }
+
+    public int getRepeatedTimeForData(Object data) {
+        int res = 0;
+        for (Object obj : datas) {
+            if (obj.equals(data)) {
+                res++;
+            }
+        }
+        return res;
+    }
+
+    public void removeData(Object data) {
+        datas.remove(data);
+    }
+
+    public void lockIncrement() {
+        lock = true;
+    }
+
+    public void unlockIncrement() {
+        lock = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/MapDataCollectorImpl.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/MapDataCollectorImpl.java
 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/MapDataCollectorImpl.java
new file mode 100644
index 0000000..899bb85
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/MapDataCollectorImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util.data.collect.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.test.util.data.collect.DataCollector;
+
+public class MapDataCollectorImpl implements DataCollector {
+
+    private Map<Object, AtomicInteger> datas = new ConcurrentHashMap<Object, 
AtomicInteger>();
+    private boolean lock = false;
+
+    public MapDataCollectorImpl() {
+
+    }
+
+    public MapDataCollectorImpl(Collection<Object> datas) {
+        for (Object data : datas) {
+            addData(data);
+        }
+    }
+
+    public synchronized void addData(Object data) {
+        if (lock) {
+            return;
+        }
+        if (datas.containsKey(data)) {
+            datas.get(data).addAndGet(1);
+        } else {
+            datas.put(data, new AtomicInteger(1));
+        }
+    }
+
+    public Collection<Object> getAllData() {
+        List<Object> lst = new ArrayList<Object>();
+        for (Entry<Object, AtomicInteger> entry : datas.entrySet()) {
+            for (int i = 0; i < entry.getValue().get(); i++) {
+                lst.add(entry.getKey());
+            }
+        }
+        return lst;
+    }
+
+    public long getDataSizeWithoutDuplicate() {
+        return datas.keySet().size();
+    }
+
+    public void resetData() {
+        datas.clear();
+        unlockIncrement();
+    }
+
+    public long getDataSize() {
+        long sum = 0;
+        for (AtomicInteger count : datas.values()) {
+            sum = sum + count.get();
+        }
+        return sum;
+    }
+
+    public boolean isRepeatedData(Object data) {
+        if (datas.containsKey(data)) {
+            return datas.get(data).get() == 1;
+        }
+        return false;
+    }
+
+    public Collection<Object> getAllDataWithoutDuplicate() {
+        return datas.keySet();
+    }
+
+    public int getRepeatedTimeForData(Object data) {
+        if (datas.containsKey(data)) {
+            return datas.get(data).intValue();
+        }
+        return 0;
+    }
+
+    public void removeData(Object data) {
+        datas.remove(data);
+    }
+
+    public void lockIncrement() {
+        lock = true;
+    }
+
+    public void unlockIncrement() {
+        lock = false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTask.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTask.java 
b/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTask.java
new file mode 100644
index 0000000..a4ad9a8
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util.parallel;
+
+import java.util.concurrent.CountDownLatch;
+
+public abstract class ParallelTask extends Thread {
+    private CountDownLatch latch = null;
+
+    public CountDownLatch getLatch() {
+        return latch;
+    }
+
+    public void setLatch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+
+    public abstract void execute();
+
+    @Override
+    public void run() {
+        this.execute();
+
+        if (latch != null) {
+            latch.countDown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTaskExecutor.java
 
b/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTaskExecutor.java
new file mode 100644
index 0000000..e7e9209
--- /dev/null
+++ 
b/test/src/main/java/org/apache/rocketmq/test/util/parallel/ParallelTaskExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util.parallel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class ParallelTaskExecutor {
+    public List<ParallelTask> tasks = new ArrayList<ParallelTask>();
+    public ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
+    public CountDownLatch latch = null;
+
+    public ParallelTaskExecutor() {
+
+    }
+
+    public void pushTask(ParallelTask task) {
+        tasks.add(task);
+    }
+
+    public void startBlock() {
+        init();
+        startTask();
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public void startNoBlock() {
+        for (ParallelTask task : tasks) {
+            cachedThreadPool.execute(task);
+        }
+    }
+
+    private void init() {
+        latch = new CountDownLatch(tasks.size());
+        for (ParallelTask task : tasks) {
+            task.setLatch(latch);
+        }
+    }
+
+    private void startTask() {
+        for (ParallelTask task : tasks) {
+            task.start();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/main/java/org/apache/rocketmq/test/util/parallel/Task4Test.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/parallel/Task4Test.java 
b/test/src/main/java/org/apache/rocketmq/test/util/parallel/Task4Test.java
new file mode 100644
index 0000000..c168d66
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/util/parallel/Task4Test.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.util.parallel;
+
+public class Task4Test extends ParallelTask {
+    private String name = "";
+
+    public Task4Test(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void execute() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java 
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
new file mode 100644
index 0000000..57462a2
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.base;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
+import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+import org.apache.rocketmq.test.listener.AbstractListener;
+import org.apache.rocketmq.test.util.MQAdmin;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.junit.Assert;
+
+public class BaseConf {
+    protected static String nsAddr;
+    protected static String broker1Name;
+    protected static String broker2Name;
+    protected static String clusterName;
+    protected static int brokerNum;
+    protected static int waitTime = 5;
+    protected static int consumeTime = 1 * 60 * 1000;
+    protected static int topicCreateTime = 30 * 1000;
+    protected static NamesrvController namesrvController;
+    protected static BrokerController brokerController1;
+    protected static BrokerController brokerController2;
+    protected static List<Object> mqClients = new ArrayList<Object>();
+    protected static boolean debug = false;
+    private static Logger log = Logger.getLogger(BaseConf.class);
+
+    static {
+        namesrvController = IntegrationTestBase.createAndStartNamesrv();
+        nsAddr = "127.0.0.1:" + 
namesrvController.getNettyServerConfig().getListenPort();
+        brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
+        brokerController2 = IntegrationTestBase.createAndStartBroker(nsAddr);
+        clusterName = 
brokerController1.getBrokerConfig().getBrokerClusterName();
+        broker1Name = brokerController1.getBrokerConfig().getBrokerName();
+        broker2Name = brokerController2.getBrokerConfig().getBrokerName();
+        brokerNum = 2;
+    }
+
+    public BaseConf() {
+
+    }
+
+    public static String initTopic() {
+        long startTime = System.currentTimeMillis();
+        String topic = MQRandomUtils.getRandomTopic();
+        boolean createResult = false;
+        while (true) {
+            createResult = MQAdmin.createTopic(nsAddr, clusterName, topic, 8);
+            if (createResult) {
+                break;
+            } else if (System.currentTimeMillis() - startTime > 
topicCreateTime) {
+                Assert.fail(String.format("topic[%s] is created failed 
after:%d ms", topic,
+                    System.currentTimeMillis() - startTime));
+                break;
+            } else {
+                TestUtils.waitForMonment(500);
+                continue;
+            }
+        }
+
+        return topic;
+    }
+
+    public static String initConsumerGroup() {
+        String group = MQRandomUtils.getRandomConsumerGroup();
+        return initConsumerGroup(group);
+    }
+
+    public static String initConsumerGroup(String group) {
+        MQAdmin.createSub(nsAddr, clusterName, group);
+        return group;
+    }
+
+    public static RMQNormalProducer getProducer(String nsAddr, String topic) {
+        RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic);
+        if (debug) {
+            producer.setDebug();
+        }
+        mqClients.add(producer);
+        return producer;
+    }
+
+    public static RMQNormalProducer getProducer(String nsAddr, String topic, 
String producerGoup,
+        String instanceName) {
+        RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, 
producerGoup,
+            instanceName);
+        if (debug) {
+            producer.setDebug();
+        }
+        mqClients.add(producer);
+        return producer;
+    }
+
+    public static RMQAsyncSendProducer getAsyncProducer(String nsAddr, String 
topic) {
+        RMQAsyncSendProducer producer = new RMQAsyncSendProducer(nsAddr, 
topic);
+        if (debug) {
+            producer.setDebug();
+        }
+        mqClients.add(producer);
+        return producer;
+    }
+
+    public static RMQNormalConsumer getConsumer(String nsAddr, String topic, 
String subExpression,
+        AbstractListener listner) {
+        String consumerGroup = initConsumerGroup();
+        return getConsumer(nsAddr, consumerGroup, topic, subExpression, 
listner);
+    }
+
+    public static RMQNormalConsumer getConsumer(String nsAddr, String 
consumerGroup, String topic,
+        String subExpression, AbstractListener listner) {
+        RMQNormalConsumer consumer = 
ConsumerFactory.getRMQNormalConsumer(nsAddr, consumerGroup,
+            topic, subExpression, listner);
+        if (debug) {
+            consumer.setDebug();
+        }
+        mqClients.add(consumer);
+        log.info(String.format("consumer[%s] 
start,topic[%s],subExpression[%s]", consumerGroup,
+            topic, subExpression));
+        return consumer;
+    }
+
+    public static void shutDown() {
+        try {
+            for (Object mqClient : mqClients) {
+                if (mqClient instanceof AbstractMQProducer) {
+                    ((AbstractMQProducer) mqClient).shutdown();
+
+                } else {
+                    ((AbstractMQConsumer) mqClient).shutdown();
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java 
b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
new file mode 100644
index 0000000..ff9996d
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.base;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IntegrationTestBase {
+    protected static final String SEP = File.separator;
+    protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
+    protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
+    protected static final List<File> TMPE_FILES = new ArrayList<>();
+    protected static final List<BrokerController> BROKER_CONTROLLERS = new 
ArrayList<>();
+    protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new 
ArrayList<>();
+    public static Logger logger = 
LoggerFactory.getLogger(IntegrationTestBase.class);
+    protected static Random random = new Random();
+
+    static {
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override public void run() {
+                for (NamesrvController namesrvController : 
NAMESRV_CONTROLLERS) {
+                    if (namesrvController != null) {
+                        namesrvController.shutdown();
+                    }
+                }
+                for (BrokerController brokerController : BROKER_CONTROLLERS) {
+                    if (brokerController != null) {
+                        brokerController.shutdown();
+                    }
+                }
+                for (File file : TMPE_FILES) {
+                    deleteFile(file);
+                }
+            }
+        });
+
+    }
+
+    private static String createBaseDir() {
+        String baseDir = System.getProperty("user.home") + SEP + 
"unitteststore-" + UUID.randomUUID();
+        final File file = new File(baseDir);
+        if (file.exists()) {
+            logger.info(String.format("[%s] has already existed, please bake 
up and remove it for integration tests", baseDir));
+            System.exit(1);
+        }
+        TMPE_FILES.add(file);
+        return baseDir;
+    }
+
+    public static NamesrvController createAndStartNamesrv() {
+        String baseDir = createBaseDir();
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nameServerNettyServerConfig = new 
NettyServerConfig();
+        namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + 
"kvConfig.json");
+        namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + 
"namesrv.properties");
+
+        nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
+        NamesrvController namesrvController = new 
NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+        try {
+            Assert.assertTrue(namesrvController.initialize());
+            logger.info("Name Server Start:{}", 
nameServerNettyServerConfig.getListenPort());
+            namesrvController.start();
+        } catch (Exception e) {
+            logger.info("Name Server start failed");
+            System.exit(1);
+        }
+        NAMESRV_CONTROLLERS.add(namesrvController);
+        return namesrvController;
+
+    }
+
+    public static BrokerController createAndStartBroker(String nsAddr) {
+        String baseDir = createBaseDir();
+        BrokerConfig brokerConfig = new BrokerConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        brokerConfig.setBrokerName(BROKER_NAME_PREFIX + 
BROKER_INDEX.getAndIncrement());
+        brokerConfig.setBrokerIP1("127.0.0.1");
+        brokerConfig.setNamesrvAddr(nsAddr);
+        storeConfig.setStorePathRootDir(baseDir);
+        storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
+        storeConfig.setHaListenPort(8000 + random.nextInt(1000));
+        nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
+        BrokerController brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, storeConfig);
+        try {
+            Assert.assertTrue(brokerController.initialize());
+            logger.info("Broker Start name:{} addr:{}", 
brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+            brokerController.start();
+        } catch (Exception e) {
+            logger.info("Broker start failed");
+            System.exit(1);
+        }
+        BROKER_CONTROLLERS.add(brokerController);
+        return brokerController;
+    }
+
+    public static void deleteFile(File file) {
+        if (!file.exists()) {
+            return;
+        }
+        if (file.isFile()) {
+            file.delete();
+        } else if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            for (File file1 : files) {
+                deleteFile(file1);
+            }
+            file.delete();
+        }
+    }
+
+}


Reply via email to