http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
new file mode 100644
index 0000000..fdafdf0
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.balance;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.MQWait;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class NormalMsgDynamicBalanceIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(NormalMsgStaticBalanceIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumerAndCrashOne() {
+        int msgSize = 400;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+
+        producer.send(msgSize);
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner());
+        consumer2.shutdown();
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        boolean balance = VerifyUtils.verifyBalance(msgSize,
+            VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer1.getListner().getAllUndupMsgBody()).size() - msgSize,
+            VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer2.getListner().getAllUndupMsgBody()).size());
+        assertThat(balance).isEqualTo(true);
+    }
+
+    @Test
+    public void test3ConsumerAndCrashOne() {
+        int msgSize = 400;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        RMQNormalConsumer consumer3 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+
+        producer.send(msgSize);
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner(), consumer3.getListner());
+        consumer3.shutdown();
+        producer.clearMsg();
+        consumer1.clearMsg();
+        consumer2.clearMsg();
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        boolean balance = VerifyUtils.verifyBalance(msgSize,
+            VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer1.getListner().getAllUndupMsgBody()).size(),
+            VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer2.getListner().getAllUndupMsgBody()).size());
+        assertThat(balance).isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java
new file mode 100644
index 0000000..117d643
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.balance;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.MQWait;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class NormalMsgStaticBalanceIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(NormalMsgStaticBalanceIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumersBalance() {
+        int msgSize = 400;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        boolean balance = VerifyUtils.verifyBalance(msgSize,
+            VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer1.getListner().getAllUndupMsgBody()).size(),
+            VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                consumer2.getListner().getAllUndupMsgBody()).size());
+        assertThat(balance).isEqualTo(true);
+    }
+
+    @Test
+    public void testFourConsumersBalance() {
+        int msgSize = 600;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        RMQNormalConsumer consumer3 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        RMQNormalConsumer consumer4 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner(), 
consumer3.getListner(),
+            consumer4.getListner());
+        assertThat(recvAll).isEqualTo(true);
+
+        boolean balance = VerifyUtils
+            .verifyBalance(msgSize,
+                VerifyUtils
+                    .getFilterdMessage(producer.getAllMsgBody(),
+                        consumer1.getListner().getAllUndupMsgBody())
+                    .size(),
+                VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                    consumer2.getListner().getAllUndupMsgBody()).size(),
+                VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                    consumer3.getListner().getAllUndupMsgBody()).size(),
+                VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+                    consumer4.getListner().getAllUndupMsgBody()).size());
+        assertThat(balance).isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/BaseBroadCastIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/BaseBroadCastIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/BaseBroadCastIT.java
new file mode 100644
index 0000000..94efaef
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/BaseBroadCastIT.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class BaseBroadCastIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(BaseBroadCastIT.class);
+
+    public static RMQBroadCastConsumer getBroadCastConsumer(String nsAddr, 
String topic,
+        String subExpression,
+        AbstractListener listner) {
+        String consumerGroup = initConsumerGroup();
+        return getBroadCastConsumer(nsAddr, consumerGroup, topic, 
subExpression, listner);
+    }
+
+    public static RMQBroadCastConsumer getBroadCastConsumer(String nsAddr, 
String consumerGroup,
+        String topic, String subExpression,
+        AbstractListener listner) {
+        RMQBroadCastConsumer consumer = 
ConsumerFactory.getRMQBroadCastConsumer(nsAddr,
+            consumerGroup, topic, subExpression, listner);
+
+        consumer.setDebug();
+
+        mqClients.add(consumer);
+        logger.info(String.format("consumer[%s] 
start,topic[%s],subExpression[%s]", consumerGroup,
+            topic, subExpression));
+        return consumer;
+    }
+
+    public void printSeperator() {
+        for (int i = 0; i < 3; i++) {
+            logger.info(
+                
"<<<<<<<<================================================================================>>>>>>>>");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java
new file mode 100644
index 0000000..65762fa
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.normal;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT {
+    private static Logger logger = Logger
+        .getLogger(NormalMsgTwoSameGroupConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        printSeperator();
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testNotConsumeAfterConsume() {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, 
topic, "*",
+            new RMQNormalListner(group + "_1"));
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, "*", new 
RMQNormalListner(group + "_2"));
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
waitTime);
+        assertThat(consumer2.getListner().getAllMsgBody().size()).isEqualTo(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java
new file mode 100644
index 0000000..b878d09
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.normal;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT {
+    private static Logger logger = Logger
+        .getLogger(NormalMsgTwoSameGroupConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        printSeperator();
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testStartTwoAndCrashOneLater() {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, 
topic, "*",
+            new RMQNormalListner(group + "_1"));
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, "*", new 
RMQNormalListner(group + "_2"));
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer2.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        consumer2.shutdown();
+
+        producer.clearMsg();
+        consumer1.clearMsg();
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java
new file mode 100644
index 0000000..26c37f9
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.normal;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT {
+    private static Logger logger = Logger
+        .getLogger(NormalMsgTwoSameGroupConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        printSeperator();
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testStartTwoConsumerAndOneConsumerFail() {
+        int msgSize = 16;
+
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, 
"*",
+            new RMQNormalListner());
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, "*",
+            new RMQNormalListner(ConsumeConcurrentlyStatus.RECONSUME_LATER));
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java
new file mode 100644
index 0000000..027f648
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.normal;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT {
+    private static Logger logger = Logger
+        .getLogger(NormalMsgTwoSameGroupConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        printSeperator();
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testStartOneAndStartAnotherLater() {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, 
topic, "*",
+            new RMQNormalListner(group + "_1"));
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+
+        producer.clearMsg();
+        consumer1.clearMsg();
+
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, "*", new 
RMQNormalListner(group + "_2"));
+        TestUtils.waitForSeconds(waitTime);
+        producer.send(msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer2.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java
new file mode 100644
index 0000000..acbaf23
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.normal;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT {
+    private static Logger logger = Logger
+        .getLogger(NormalMsgTwoSameGroupConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        printSeperator();
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testStartDiffSameGroupConsumer() {
+        int msgSize = 16;
+
+        String group1 = initConsumerGroup();
+        String group2 = initConsumerGroup();
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group1, 
topic, "*",
+            new RMQNormalListner(group1 + "_1"));
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, group2, 
topic, "*",
+            new RMQNormalListner(group2 + "_2"));
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer2.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java
new file mode 100644
index 0000000..984c941
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.normal;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT {
+    private static Logger logger = Logger
+        .getLogger(NormalMsgTwoSameGroupConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        printSeperator();
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testStartTwoSameGroupConsumer() {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, 
topic, "*",
+            new RMQNormalListner(group + "_1"));
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, "*", new 
RMQNormalListner(group + "_2"));
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer2.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
new file mode 100644
index 0000000..ac8fcf5
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.order;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class OrderMsgBroadCastIT extends BaseBroadCastIT {
+    private static Logger logger = Logger.getLogger(OrderMsgBroadCastIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumerSubTag() {
+        int msgSize = 10;
+
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, 
"*",
+            new RMQOrderListener());
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, "*", new RMQOrderListener());
+        TestUtils.waitForSeconds(waitTime);
+
+        List<MessageQueue> mqs = producer.getMessageQueue();
+        MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize);
+        producer.send(mqMsgs.getMsgsWithMQ());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer1.getListner()).getMsgs()))
+            .isEqualTo(true);
+        assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) 
consumer2.getListner()).getMsgs()))
+            .isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java
new file mode 100644
index 0000000..a1a2ff7
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.tag;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT {
+    private static Logger logger = 
Logger.getLogger(BroadCastTwoConsumerSubTagIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumerFilter() {
+        int msgSize = 40;
+        String tag1 = "jueyin_tag_1";
+        String tag2 = "jueyin_tag_2";
+
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, 
tag1,
+            new RMQNormalListner());
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListner());
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(tag2, msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+        producer.clearMsg();
+        producer.send(tag1, msgSize);
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer2.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java
new file mode 100644
index 0000000..5c2e7fc
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.tag;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT {
+    private static Logger logger = 
Logger.getLogger(BroadCastTwoConsumerSubTagIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumerSubDiffTag() {
+        int msgSize = 40;
+        String tag = "jueyin_tag";
+
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, 
"*",
+            new RMQNormalListner());
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner());
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(tag, msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer2.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java
new file mode 100644
index 0000000..e4510de
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.broadcast.tag;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT {
+    private static Logger logger = 
Logger.getLogger(BroadCastTwoConsumerSubTagIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTwoConsumerSubTag() {
+        int msgSize = 20;
+        String tag = "jueyin_tag";
+
+        RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, 
tag,
+            new RMQNormalListner());
+        RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr,
+            consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner());
+        TestUtils.waitForSeconds(waitTime);
+
+        producer.send(tag, msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+
+        consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer1.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer2.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java
new file mode 100644
index 0000000..303dfa0
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import 
org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT;
+import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.MQWait;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class DynamicAddAndCrashIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(NormalMsgStaticBalanceIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testAddOneConsumerAndCrashAfterWhile() {
+        int msgSize = 150;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, 
msgSize, 100);
+        asyncDefaultMQProducer.start();
+        TestUtils.waitForSeconds(waitTime);
+
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        TestUtils.waitForSeconds(waitTime);
+        consumer2.shutdown();
+
+        asyncDefaultMQProducer.waitSendAll(waitTime * 6);
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+
+    @Test
+    public void testAddTwoConsumerAndCrashAfterWhile() {
+        int msgSize = 150;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, 
msgSize, 100);
+        asyncDefaultMQProducer.start();
+        TestUtils.waitForSeconds(waitTime);
+
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        RMQNormalConsumer consumer3 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        TestUtils.waitForSeconds(waitTime);
+
+        consumer2.shutdown();
+        consumer3.shutdown();
+
+        asyncDefaultMQProducer.waitSendAll(waitTime * 6);
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner(), consumer3.getListner());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner(), 
consumer3.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java
new file mode 100644
index 0000000..46dbb70
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import 
org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT;
+import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.MQWait;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class DynamicAddConsumerIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(NormalMsgStaticBalanceIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testAddOneConsumer() {
+        int msgSize = 100;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, 
msgSize, 100);
+        asyncDefaultMQProducer.start();
+        TestUtils.waitForSeconds(waitTime);
+
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+
+        asyncDefaultMQProducer.waitSendAll(waitTime * 6);
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+
+    @Test
+    public void testAddTwoConsumer() {
+        int msgSize = 100;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+
+        MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, 
msgSize, 100);
+        asyncDefaultMQProducer.start();
+        TestUtils.waitForSeconds(waitTime);
+
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        RMQNormalConsumer consumer3 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+
+        asyncDefaultMQProducer.waitSendAll(waitTime * 6);
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner(), consumer3.getListner());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner(), 
consumer3.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java
new file mode 100644
index 0000000..807f950
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.cluster;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import 
org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT;
+import org.apache.rocketmq.test.client.mq.MQAsyncProducer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.MQWait;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class DynamicCrashConsumerIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(NormalMsgStaticBalanceIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s !", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testAddOneConsumer() {
+        int msgSize = 100;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+
+        MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, 
msgSize, 100);
+        asyncDefaultMQProducer.start();
+        TestUtils.waitForSeconds(waitTime);
+
+        consumer2.shutdown();
+
+        asyncDefaultMQProducer.waitSendAll(waitTime * 6);
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+
+    @Test
+    public void testAddTwoConsumer() {
+        int msgSize = 100;
+        RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new 
RMQNormalListner());
+        RMQNormalConsumer consumer2 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+        RMQNormalConsumer consumer3 = getConsumer(nsAddr, 
consumer1.getConsumerGroup(), topic,
+            "*", new RMQNormalListner());
+
+        MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, 
msgSize, 100);
+        asyncDefaultMQProducer.start();
+        TestUtils.waitForSeconds(waitTime);
+
+        consumer2.shutdown();
+        consumer3.shutdown();
+
+        asyncDefaultMQProducer.waitSendAll(waitTime * 6);
+
+        MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), 
consumer1.getListner(),
+            consumer2.getListner(), consumer3.getListner());
+
+        boolean recvAll = MQWait.waitConsumeAll(consumeTime, 
producer.getAllMsgBody(),
+            consumer1.getListner(), consumer2.getListner(), 
consumer3.getListner());
+        assertThat(recvAll).isEqualTo(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java
new file mode 100644
index 0000000..ecb204e
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.tag;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.test.factory.TagMessage;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.TestUtils;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class MulTagSubIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        String consumerId = initConsumerGroup();
+        logger.info(String.format("use topic: %s; consumerId: %s !", topic, 
consumerId));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testSubTwoTabMessageOnsTag() {
+        String tag = "jueyin1";
+        String subExpress = String.format("%s||jueyin2", tag);
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+        producer.send(tag, msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testSubTwoTabAndMatchOne() {
+        String tag1 = "jueyin1";
+        String tag2 = "jueyin2";
+        String subExpress = String.format("%s||noExistTag", tag2);
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+
+        producer.send(tag1, msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize, 
producer.getAllUndupMsgBody().size());
+        List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, 
msgSize);
+        producer.send(tag2Msgs);
+        Assert.assertEquals("Not all sent succeeded", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+
+        
consumer.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs),
+            consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs));
+    }
+
+    @Test
+    public void testSubTwoTabAndMatchTwo() {
+        String tags[] = {"jueyin1", "jueyin2"};
+        String subExpress = String.format("%s||%s", tags[0], tags[1]);
+        int msgSize = 10;
+
+        TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+
+        producer.send(tagMessage.getMixedTagMessages());
+        Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
+            producer.getAllUndupMsgBody().size());
+
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(tagMessage.getAllTagMessageBody());
+    }
+
+    @Test
+    public void testSubThreeTabAndMatchTwo() {
+        String tags[] = {"jueyin1", "jueyin2", "jueyin3"};
+        String subExpress = String.format("%s||%s", tags[0], tags[1]);
+        int msgSize = 10;
+
+        TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+
+        producer.send(tagMessage.getMixedTagMessages());
+        Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
+            producer.getAllUndupMsgBody().size());
+
+        consumer.getListner().waitForMessageConsume(
+            tagMessage.getMessageBodyByTag(tags[0], tags[1]), consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody())).containsExactlyElementsIn(
+            tagMessage.getMessageBodyByTag(tags[0], tags[1]));
+    }
+
+    @Test
+    public void testNoMatch() {
+        String tags[] = {"jueyin1", "jueyin2", "jueyin3"};
+        String subExpress = "no_match";
+        int msgSize = 10;
+
+        TagMessage tagMessage = new TagMessage(tags, topic, msgSize);
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+
+        producer.send(tagMessage.getMixedTagMessages());
+        Assert.assertEquals("Not all sent succeeded", msgSize * tags.length,
+            producer.getAllUndupMsgBody().size());
+
+        TestUtils.waitForSeconds(5);
+
+        assertThat(VerifyUtils
+            .getFilterdMessage(producer.getAllMsgBody(), 
consumer.getListner().getAllMsgBody())
+            .size()).isEqualTo(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java
new file mode 100644
index 0000000..f0a1d48
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.tag;
+
+import java.util.List;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.factory.MQMessageFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class TagMessageWith1ConsumerIT extends BaseConf {
+    private static Logger logger = 
Logger.getLogger(TagMessageWith1ConsumerIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        String consumerId = initConsumerGroup();
+        logger.info(String.format("use topic: %s; consumerId: %s !", topic, 
consumerId));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testTagSmoke() {
+        String tag = "jueyin";
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new 
RMQNormalListner());
+        producer.send(tag, msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testSubAllMessageNoTag() {
+        String subExprress = "*";
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExprress,
+            new RMQNormalListner());
+        producer.send(msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testSubAllMessageWithTag() {
+        String tag = "jueyin";
+        String subExpress = "*";
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+        producer.send(tag, msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testSubAllMessageWithNullTag() {
+        String tag = null;
+        String subExpress = "*";
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+        producer.send(tag, msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testSubNullWithTagNull() {
+        String tag = null;
+        String subExpress = null;
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+        producer.send(tag, msgSize);
+        Assert.assertEquals("Not all are sent", msgSize, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testSubAllWithKindsOfMessage() {
+        String tag1 = null;
+        String tag2 = "jueyin";
+        String subExpress = "*";
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+
+        List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, 
msgSize);
+        List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, 
msgSize);
+
+        producer.send(tag1Msgs);
+        producer.send(tag2Msgs);
+        producer.send(10);
+        Assert.assertEquals("Not all are sent", msgSize * 3, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testSubNullWithKindsOfMessage() {
+        String tag1 = null;
+        String tag2 = "jueyin";
+        String subExpress = null;
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+
+        List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, 
msgSize);
+        List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, 
msgSize);
+
+        producer.send(tag1Msgs);
+        producer.send(tag2Msgs);
+        Assert.assertEquals("Not all are sent", msgSize * 2, 
producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), 
consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            .containsExactlyElementsIn(producer.getAllMsgBody());
+    }
+
+    @Test
+    public void testSubTagWithKindsOfMessage() {
+        String tag1 = null;
+        String tag2 = "jueyin";
+        String subExpress = tag2;
+        int msgSize = 10;
+        RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress,
+            new RMQNormalListner());
+
+        List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, 
msgSize);
+        List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, 
msgSize);
+
+        producer.send(tag1Msgs);
+        producer.send(tag2Msgs);
+        producer.send(10);
+        Assert.assertEquals("Not all are sent", msgSize * 3, 
producer.getAllUndupMsgBody().size());
+        
consumer.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs),
+            consumeTime);
+
+        assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()))
+            
.containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs));
+    }
+}

Reply via email to