http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java new file mode 100644 index 0000000..fdafdf0 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgDynamicBalanceIT.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.balance; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.MQWait; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class NormalMsgDynamicBalanceIT extends BaseConf { + private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumerAndCrashOne() { + int msgSize = 400; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + + producer.send(msgSize); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner()); + consumer2.shutdown(); + + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + + boolean balance = VerifyUtils.verifyBalance(msgSize, + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllUndupMsgBody()).size() - msgSize, + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllUndupMsgBody()).size()); + assertThat(balance).isEqualTo(true); + } + + @Test + public void test3ConsumerAndCrashOne() { + int msgSize = 400; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + + producer.send(msgSize); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner(), consumer3.getListner()); + consumer3.shutdown(); + producer.clearMsg(); + consumer1.clearMsg(); + consumer2.clearMsg(); + + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + + boolean balance = VerifyUtils.verifyBalance(msgSize, + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllUndupMsgBody()).size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllUndupMsgBody()).size()); + assertThat(balance).isEqualTo(true); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java new file mode 100644 index 0000000..117d643 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/balance/NormalMsgStaticBalanceIT.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.balance; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.MQWait; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class NormalMsgStaticBalanceIT extends BaseConf { + private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumersBalance() { + int msgSize = 400; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + TestUtils.waitForSeconds(waitTime); + + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + + boolean balance = VerifyUtils.verifyBalance(msgSize, + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllUndupMsgBody()).size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllUndupMsgBody()).size()); + assertThat(balance).isEqualTo(true); + } + + @Test + public void testFourConsumersBalance() { + int msgSize = 600; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + TestUtils.waitForSeconds(waitTime); + + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner(), consumer3.getListner(), + consumer4.getListner()); + assertThat(recvAll).isEqualTo(true); + + boolean balance = VerifyUtils + .verifyBalance(msgSize, + VerifyUtils + .getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllUndupMsgBody()) + .size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllUndupMsgBody()).size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer3.getListner().getAllUndupMsgBody()).size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer4.getListner().getAllUndupMsgBody()).size()); + assertThat(balance).isEqualTo(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/BaseBroadCastIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/BaseBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/BaseBroadCastIT.java new file mode 100644 index 0000000..94efaef --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/BaseBroadCastIT.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.factory.ConsumerFactory; +import org.apache.rocketmq.test.listener.AbstractListener; + +public class BaseBroadCastIT extends BaseConf { + private static Logger logger = Logger.getLogger(BaseBroadCastIT.class); + + public static RMQBroadCastConsumer getBroadCastConsumer(String nsAddr, String topic, + String subExpression, + AbstractListener listner) { + String consumerGroup = initConsumerGroup(); + return getBroadCastConsumer(nsAddr, consumerGroup, topic, subExpression, listner); + } + + public static RMQBroadCastConsumer getBroadCastConsumer(String nsAddr, String consumerGroup, + String topic, String subExpression, + AbstractListener listner) { + RMQBroadCastConsumer consumer = ConsumerFactory.getRMQBroadCastConsumer(nsAddr, + consumerGroup, topic, subExpression, listner); + + consumer.setDebug(); + + mqClients.add(consumer); + logger.info(String.format("consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup, + topic, subExpression)); + return consumer; + } + + public void printSeperator() { + for (int i = 0; i < 3; i++) { + logger.info( + "<<<<<<<<================================================================================>>>>>>>>"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java new file mode 100644 index 0000000..65762fa --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgNotRecvIT.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.normal; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class BroadCastNormalMsgNotRecvIT extends BaseBroadCastIT { + private static Logger logger = Logger + .getLogger(NormalMsgTwoSameGroupConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + printSeperator(); + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testNotConsumeAfterConsume() { + int msgSize = 16; + + String group = initConsumerGroup(); + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*", + new RMQNormalListner(group + "_1")); + + producer.send(msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), waitTime); + assertThat(consumer2.getListner().getAllMsgBody().size()).isEqualTo(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java new file mode 100644 index 0000000..b878d09 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvCrashIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.normal; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class BroadCastNormalMsgRecvCrashIT extends BaseBroadCastIT { + private static Logger logger = Logger + .getLogger(NormalMsgTwoSameGroupConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + printSeperator(); + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testStartTwoAndCrashOneLater() { + int msgSize = 16; + + String group = initConsumerGroup(); + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*", + new RMQNormalListner(group + "_1")); + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); + TestUtils.waitForSeconds(waitTime); + + producer.send(msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + consumer2.shutdown(); + + producer.clearMsg(); + consumer1.clearMsg(); + + producer.send(msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java new file mode 100644 index 0000000..26c37f9 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvFailIT.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.normal; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class BroadCastNormalMsgRecvFailIT extends BaseBroadCastIT { + private static Logger logger = Logger + .getLogger(NormalMsgTwoSameGroupConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + printSeperator(); + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testStartTwoConsumerAndOneConsumerFail() { + int msgSize = 16; + + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*", + new RMQNormalListner()); + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, "*", + new RMQNormalListner(ConsumeConcurrentlyStatus.RECONSUME_LATER)); + + producer.send(msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java new file mode 100644 index 0000000..027f648 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgRecvStartLaterIT.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.normal; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class BroadCastNormalMsgRecvStartLaterIT extends BaseBroadCastIT { + private static Logger logger = Logger + .getLogger(NormalMsgTwoSameGroupConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + printSeperator(); + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testStartOneAndStartAnotherLater() { + int msgSize = 16; + + String group = initConsumerGroup(); + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*", + new RMQNormalListner(group + "_1")); + TestUtils.waitForSeconds(waitTime); + + producer.send(msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + + producer.clearMsg(); + consumer1.clearMsg(); + + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); + TestUtils.waitForSeconds(waitTime); + producer.send(msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java new file mode 100644 index 0000000..acbaf23 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/BroadCastNormalMsgTwoDiffGroupRecvIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.normal; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class BroadCastNormalMsgTwoDiffGroupRecvIT extends BaseBroadCastIT { + private static Logger logger = Logger + .getLogger(NormalMsgTwoSameGroupConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + printSeperator(); + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testStartDiffSameGroupConsumer() { + int msgSize = 16; + + String group1 = initConsumerGroup(); + String group2 = initConsumerGroup(); + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group1, topic, "*", + new RMQNormalListner(group1 + "_1")); + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, group2, topic, "*", + new RMQNormalListner(group2 + "_2")); + TestUtils.waitForSeconds(waitTime); + + producer.send(msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java new file mode 100644 index 0000000..984c941 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/normal/NormalMsgTwoSameGroupConsumerIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.normal; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class NormalMsgTwoSameGroupConsumerIT extends BaseBroadCastIT { + private static Logger logger = Logger + .getLogger(NormalMsgTwoSameGroupConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + printSeperator(); + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testStartTwoSameGroupConsumer() { + int msgSize = 16; + + String group = initConsumerGroup(); + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, group, topic, "*", + new RMQNormalListner(group + "_1")); + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, "*", new RMQNormalListner(group + "_2")); + TestUtils.waitForSeconds(waitTime); + + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java new file mode 100644 index 0000000..ac8fcf5 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.order; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener; +import org.apache.rocketmq.test.message.MessageQueueMsg; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OrderMsgBroadCastIT extends BaseBroadCastIT { + private static Logger logger = Logger.getLogger(OrderMsgBroadCastIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumerSubTag() { + int msgSize = 10; + + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*", + new RMQOrderListener()); + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, "*", new RMQOrderListener()); + TestUtils.waitForSeconds(waitTime); + + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) + .isEqualTo(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java new file mode 100644 index 0000000..a1a2ff7 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerFilterIT.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.tag; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class BroadCastTwoConsumerFilterIT extends BaseBroadCastIT { + private static Logger logger = Logger.getLogger(BroadCastTwoConsumerSubTagIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumerFilter() { + int msgSize = 40; + String tag1 = "jueyin_tag_1"; + String tag2 = "jueyin_tag_2"; + + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag1, + new RMQNormalListner()); + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, tag1, new RMQNormalListner()); + TestUtils.waitForSeconds(waitTime); + + producer.send(tag2, msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + producer.clearMsg(); + producer.send(tag1, msgSize); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java new file mode 100644 index 0000000..5c2e7fc --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubDiffTagIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.tag; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class BroadCastTwoConsumerSubDiffTagIT extends BaseBroadCastIT { + private static Logger logger = Logger.getLogger(BroadCastTwoConsumerSubTagIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumerSubDiffTag() { + int msgSize = 40; + String tag = "jueyin_tag"; + + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, "*", + new RMQNormalListner()); + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner()); + TestUtils.waitForSeconds(waitTime); + + producer.send(tag, msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java new file mode 100644 index 0000000..e4510de --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/tag/BroadCastTwoConsumerSubTagIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.broadcast.tag; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; +import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class BroadCastTwoConsumerSubTagIT extends BaseBroadCastIT { + private static Logger logger = Logger.getLogger(BroadCastTwoConsumerSubTagIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumerSubTag() { + int msgSize = 20; + String tag = "jueyin_tag"; + + RMQBroadCastConsumer consumer1 = getBroadCastConsumer(nsAddr, topic, tag, + new RMQNormalListner()); + RMQBroadCastConsumer consumer2 = getBroadCastConsumer(nsAddr, + consumer1.getConsumerGroup(), topic, tag, new RMQNormalListner()); + TestUtils.waitForSeconds(waitTime); + + producer.send(tag, msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer1.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + consumer2.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java new file mode 100644 index 0000000..303dfa0 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddAndCrashIT.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.cluster; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT; +import org.apache.rocketmq.test.client.mq.MQAsyncProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.MQWait; +import org.apache.rocketmq.test.util.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class DynamicAddAndCrashIT extends BaseConf { + private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testAddOneConsumerAndCrashAfterWhile() { + int msgSize = 150; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); + asyncDefaultMQProducer.start(); + TestUtils.waitForSeconds(waitTime); + + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + TestUtils.waitForSeconds(waitTime); + consumer2.shutdown(); + + asyncDefaultMQProducer.waitSendAll(waitTime * 6); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + } + + @Test + public void testAddTwoConsumerAndCrashAfterWhile() { + int msgSize = 150; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); + asyncDefaultMQProducer.start(); + TestUtils.waitForSeconds(waitTime); + + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + TestUtils.waitForSeconds(waitTime); + + consumer2.shutdown(); + consumer3.shutdown(); + + asyncDefaultMQProducer.waitSendAll(waitTime * 6); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner(), consumer3.getListner()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner(), consumer3.getListner()); + assertThat(recvAll).isEqualTo(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java new file mode 100644 index 0000000..46dbb70 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicAddConsumerIT.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.cluster; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT; +import org.apache.rocketmq.test.client.mq.MQAsyncProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.MQWait; +import org.apache.rocketmq.test.util.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class DynamicAddConsumerIT extends BaseConf { + private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testAddOneConsumer() { + int msgSize = 100; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); + asyncDefaultMQProducer.start(); + TestUtils.waitForSeconds(waitTime); + + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + + asyncDefaultMQProducer.waitSendAll(waitTime * 6); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + } + + @Test + public void testAddTwoConsumer() { + int msgSize = 100; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + + MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); + asyncDefaultMQProducer.start(); + TestUtils.waitForSeconds(waitTime); + + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + + asyncDefaultMQProducer.waitSendAll(waitTime * 6); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner(), consumer3.getListner()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner(), consumer3.getListner()); + assertThat(recvAll).isEqualTo(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java new file mode 100644 index 0000000..807f950 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/cluster/DynamicCrashConsumerIT.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.cluster; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT; +import org.apache.rocketmq.test.client.mq.MQAsyncProducer; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.MQWait; +import org.apache.rocketmq.test.util.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class DynamicCrashConsumerIT extends BaseConf { + private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testAddOneConsumer() { + int msgSize = 100; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + + MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); + asyncDefaultMQProducer.start(); + TestUtils.waitForSeconds(waitTime); + + consumer2.shutdown(); + + asyncDefaultMQProducer.waitSendAll(waitTime * 6); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + } + + @Test + public void testAddTwoConsumer() { + int msgSize = 100; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQNormalListner()); + + MQAsyncProducer asyncDefaultMQProducer = new MQAsyncProducer(producer, msgSize, 100); + asyncDefaultMQProducer.start(); + TestUtils.waitForSeconds(waitTime); + + consumer2.shutdown(); + consumer3.shutdown(); + + asyncDefaultMQProducer.waitSendAll(waitTime * 6); + + MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), consumer1.getListner(), + consumer2.getListner(), consumer3.getListner()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner(), consumer3.getListner()); + assertThat(recvAll).isEqualTo(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java new file mode 100644 index 0000000..ecb204e --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/MulTagSubIT.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.tag; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.factory.TagMessage; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class MulTagSubIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + String consumerId = initConsumerGroup(); + logger.info(String.format("use topic: %s; consumerId: %s !", topic, consumerId)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSubTwoTabMessageOnsTag() { + String tag = "jueyin1"; + String subExpress = String.format("%s||jueyin2", tag); + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + producer.send(tag, msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testSubTwoTabAndMatchOne() { + String tag1 = "jueyin1"; + String tag2 = "jueyin2"; + String subExpress = String.format("%s||noExistTag", tag2); + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + + producer.send(tag1, msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize); + producer.send(tag2Msgs); + Assert.assertEquals("Not all sent succeeded", msgSize * 2, producer.getAllUndupMsgBody().size()); + + consumer.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs), + consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs)); + } + + @Test + public void testSubTwoTabAndMatchTwo() { + String tags[] = {"jueyin1", "jueyin2"}; + String subExpress = String.format("%s||%s", tags[0], tags[1]); + int msgSize = 10; + + TagMessage tagMessage = new TagMessage(tags, topic, msgSize); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + + producer.send(tagMessage.getMixedTagMessages()); + Assert.assertEquals("Not all sent succeeded", msgSize * tags.length, + producer.getAllUndupMsgBody().size()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(tagMessage.getAllTagMessageBody()); + } + + @Test + public void testSubThreeTabAndMatchTwo() { + String tags[] = {"jueyin1", "jueyin2", "jueyin3"}; + String subExpress = String.format("%s||%s", tags[0], tags[1]); + int msgSize = 10; + + TagMessage tagMessage = new TagMessage(tags, topic, msgSize); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + + producer.send(tagMessage.getMixedTagMessages()); + Assert.assertEquals("Not all sent succeeded", msgSize * tags.length, + producer.getAllUndupMsgBody().size()); + + consumer.getListner().waitForMessageConsume( + tagMessage.getMessageBodyByTag(tags[0], tags[1]), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())).containsExactlyElementsIn( + tagMessage.getMessageBodyByTag(tags[0], tags[1])); + } + + @Test + public void testNoMatch() { + String tags[] = {"jueyin1", "jueyin2", "jueyin3"}; + String subExpress = "no_match"; + int msgSize = 10; + + TagMessage tagMessage = new TagMessage(tags, topic, msgSize); + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + + producer.send(tagMessage.getMixedTagMessages()); + Assert.assertEquals("Not all sent succeeded", msgSize * tags.length, + producer.getAllUndupMsgBody().size()); + + TestUtils.waitForSeconds(5); + + assertThat(VerifyUtils + .getFilterdMessage(producer.getAllMsgBody(), consumer.getListner().getAllMsgBody()) + .size()).isEqualTo(0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java new file mode 100644 index 0000000..f0a1d48 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/tag/TagMessageWith1ConsumerIT.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.consumer.tag; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class TagMessageWith1ConsumerIT extends BaseConf { + private static Logger logger = Logger.getLogger(TagMessageWith1ConsumerIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + String consumerId = initConsumerGroup(); + logger.info(String.format("use topic: %s; consumerId: %s !", topic, consumerId)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTagSmoke() { + String tag = "jueyin"; + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQNormalListner()); + producer.send(tag, msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testSubAllMessageNoTag() { + String subExprress = "*"; + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExprress, + new RMQNormalListner()); + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testSubAllMessageWithTag() { + String tag = "jueyin"; + String subExpress = "*"; + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + producer.send(tag, msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testSubAllMessageWithNullTag() { + String tag = null; + String subExpress = "*"; + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + producer.send(tag, msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testSubNullWithTagNull() { + String tag = null; + String subExpress = null; + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + producer.send(tag, msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testSubAllWithKindsOfMessage() { + String tag1 = null; + String tag2 = "jueyin"; + String subExpress = "*"; + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + + List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize); + List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize); + + producer.send(tag1Msgs); + producer.send(tag2Msgs); + producer.send(10); + Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testSubNullWithKindsOfMessage() { + String tag1 = null; + String tag2 = "jueyin"; + String subExpress = null; + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + + List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize); + List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize); + + producer.send(tag1Msgs); + producer.send(tag2Msgs); + Assert.assertEquals("Not all are sent", msgSize * 2, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } + + @Test + public void testSubTagWithKindsOfMessage() { + String tag1 = null; + String tag2 = "jueyin"; + String subExpress = tag2; + int msgSize = 10; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, subExpress, + new RMQNormalListner()); + + List<Object> tag1Msgs = MQMessageFactory.getRMQMessage(tag1, topic, msgSize); + List<Object> tag2Msgs = MQMessageFactory.getRMQMessage(tag2, topic, msgSize); + + producer.send(tag1Msgs); + producer.send(tag2Msgs); + producer.send(10); + Assert.assertEquals("Not all are sent", msgSize * 3, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(MQMessageFactory.getMessageBody(tag2Msgs), + consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(MQMessageFactory.getMessageBody(tag2Msgs)); + } +}