http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java new file mode 100644 index 0000000..57b69d2 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgRebalanceIT.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.order; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener; +import org.apache.rocketmq.test.message.MessageQueueMsg; +import org.apache.rocketmq.test.util.MQWait; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OrderMsgRebalanceIT extends BaseConf { + private static Logger logger = Logger.getLogger(OrderMsgRebalanceIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s !", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testTwoConsumersBalance() { + int msgSize = 10; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQOrderListener()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQOrderListener()); + TestUtils.waitForSeconds(waitTime); + + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + + boolean balance = VerifyUtils.verifyBalance(producer.getAllMsgBody().size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllUndupMsgBody()).size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllUndupMsgBody()).size()); + assertThat(balance).isEqualTo(true); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) + .isEqualTo(true); + } + + @Test + public void testFourConsuemrBalance() { + int msgSize = 20; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, "*", new RMQOrderListener()); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQOrderListener()); + RMQNormalConsumer consumer3 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQOrderListener()); + RMQNormalConsumer consumer4 = getConsumer(nsAddr, consumer1.getConsumerGroup(), topic, + "*", new RMQOrderListener()); + TestUtils.waitForSeconds(waitTime); + + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); + producer.send(mqMsgs.getMsgsWithMQ()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner(), consumer3.getListner(), + consumer4.getListner()); + assertThat(recvAll).isEqualTo(true); + + boolean balance = VerifyUtils + .verifyBalance(producer.getAllMsgBody().size(), + VerifyUtils + .getFilterdMessage(producer.getAllMsgBody(), + consumer1.getListner().getAllUndupMsgBody()) + .size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer2.getListner().getAllUndupMsgBody()).size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer3.getListner().getAllUndupMsgBody()).size(), + VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer4.getListner().getAllUndupMsgBody()).size()); + logger.info(String.format("consumer1:%s;consumer2:%s;consumer3:%s,consumer4:%s", + consumer1.getListner().getAllMsgBody().size(), + consumer2.getListner().getAllMsgBody().size(), + consumer3.getListner().getAllMsgBody().size(), + consumer4.getListner().getAllMsgBody().size())); + assertThat(balance).isEqualTo(true); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer3.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer4.getListner()).getMsgs())) + .isEqualTo(true); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java new file mode 100644 index 0000000..7db77de --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/order/OrderMsgWithTagIT.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.order; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener; +import org.apache.rocketmq.test.message.MessageQueueMsg; +import org.apache.rocketmq.test.util.MQWait; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class OrderMsgWithTagIT extends BaseConf { + private static Logger logger = Logger.getLogger(OrderMsgIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + shutDown(); + } + + @Test + public void testOrderMsgWithTagSubAll() { + int msgSize = 10; + String tag = "jueyin_tag"; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener()); + + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag); + producer.send(mqMsgs.getMsgsWithMQ()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(mqMsgs.getMsgBodys()); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) + .isEqualTo(true); + } + + @Test + public void testOrderMsgWithTagSubTag() { + int msgSize = 5; + String tag = "jueyin_tag"; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag, new RMQOrderListener()); + + List<MessageQueue> mqs = producer.getMessageQueue(); + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag); + producer.send(mqMsgs.getMsgsWithMQ()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(mqMsgs.getMsgBodys()); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) + .isEqualTo(true); + } + + @Test + public void testOrderMsgWithTag1AndTag2SubTag1() { + int msgSize = 5; + String tag1 = "jueyin_tag_1"; + String tag2 = "jueyin_tag_2"; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, tag1, new RMQOrderListener()); + + List<MessageQueue> mqs = producer.getMessageQueue(); + + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag2); + producer.send(mqMsgs.getMsgsWithMQ()); + producer.clearMsg(); + + mqMsgs = new MessageQueueMsg(mqs, msgSize, tag1); + producer.send(mqMsgs.getMsgsWithMQ()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(mqMsgs.getMsgBodys()); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) + .isEqualTo(true); + } + + @Test + public void testTwoConsumerSubTag() { + int msgSize = 10; + String tag1 = "jueyin_tag_1"; + String tag2 = "jueyin_tag_2"; + RMQNormalConsumer consumer1 = getConsumer(nsAddr, topic, tag1, + new RMQOrderListener("consumer1")); + RMQNormalConsumer consumer2 = getConsumer(nsAddr, topic, tag2, + new RMQOrderListener("consumer2")); + List<MessageQueue> mqs = producer.getMessageQueue(); + + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag1); + producer.send(mqMsgs.getMsgsWithMQ()); + + mqMsgs = new MessageQueueMsg(mqs, msgSize, tag2); + producer.send(mqMsgs.getMsgsWithMQ()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer1.getListner(), consumer2.getListner()); + assertThat(recvAll).isEqualTo(true); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer1.getListner()).getMsgs())) + .isEqualTo(true); + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer2.getListner()).getMsgs())) + .isEqualTo(true); + } + + @Test + public void testConsumeTwoTag() { + int msgSize = 10; + String tag1 = "jueyin_tag_1"; + String tag2 = "jueyin_tag_2"; + RMQNormalConsumer consumer = getConsumer(nsAddr, topic, + String.format("%s||%s", tag1, tag2), new RMQOrderListener()); + + List<MessageQueue> mqs = producer.getMessageQueue(); + + MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize, tag1); + producer.send(mqMsgs.getMsgsWithMQ()); + + mqMsgs = new MessageQueueMsg(mqs, msgSize, tag2); + producer.send(mqMsgs.getMsgsWithMQ()); + + boolean recvAll = MQWait.waitConsumeAll(consumeTime, producer.getAllMsgBody(), + consumer.getListner()); + assertThat(recvAll).isEqualTo(true); + + assertThat(VerifyUtils.verifyOrder(((RMQOrderListener) consumer.getListner()).getMsgs())) + .isEqualTo(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java new file mode 100644 index 0000000..a1520a0 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdExceptionIT.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.querymsg; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class QueryMsgByIdExceptionIT extends BaseConf { + private static Logger logger = Logger.getLogger(QueryMsgByKeyIT.class); + private static RMQNormalProducer producer = null; + private static String topic = null; + + @BeforeClass + public static void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @AfterClass + public static void tearDown() { + shutDown(); + } + + @Test + public void testQueryMsgByErrorMsgId() { + producer.clearMsg(); + int msgSize = 20; + String errorMsgId = "errorMsgId"; + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + MessageExt queryMsg = null; + try { + queryMsg = producer.getProducer().viewMessage(errorMsgId); + } catch (Exception e) { + } + + assertThat(queryMsg).isNull(); + } + + @Test + public void testQueryMsgByNullMsgId() { + producer.clearMsg(); + int msgSize = 20; + String errorMsgId = null; + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + MessageExt queryMsg = null; + try { + queryMsg = producer.getProducer().viewMessage(errorMsgId); + } catch (Exception e) { + } + + assertThat(queryMsg).isNull(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java new file mode 100644 index 0000000..92c40c7 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByIdIT.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.querymsg; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageClientExt; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.TestUtils; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class QueryMsgByIdIT extends BaseConf { + private static Logger logger = Logger.getLogger(QueryMsgByIdIT.class); + private RMQNormalProducer producer = null; + private RMQNormalConsumer consumer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + } + + @After + public void tearDown() { + shutDown(); + } + + @Test + public void testQueryMsg() { + int msgSize = 20; + producer.send(msgSize); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())); + + MessageExt recvMsg = (MessageExt) consumer.getListner().getFirstMsg(); + MessageExt queryMsg = null; + try { + TestUtils.waitForMonment(3000); + queryMsg = producer.getProducer().viewMessage(((MessageClientExt) recvMsg).getOffsetMsgId()); + } catch (Exception e) { + } + + assertThat(queryMsg).isNotNull(); + assertThat(new String(queryMsg.getBody())).isEqualTo(new String(recvMsg.getBody())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java new file mode 100644 index 0000000..ec45a29 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/querymsg/QueryMsgByKeyIT.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client.producer.querymsg; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.util.TestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class QueryMsgByKeyIT extends BaseConf { + private static Logger logger = Logger.getLogger(QueryMsgByKeyIT.class); + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + } + + @After + public void tearDown() { + shutDown(); + } + + @Test + public void testQueryMsg() { + int msgSize = 20; + String key = "jueyin"; + long begin = System.currentTimeMillis(); + List<Object> msgs = MQMessageFactory.getKeyMsg(topic, key, msgSize); + producer.send(msgs); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + List<MessageExt> queryMsgs = null; + try { + TestUtils.waitForMonment(500 * 3); + queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 5000, + System.currentTimeMillis() + 5000).getMessageList(); + } catch (Exception e) { + } + + assertThat(queryMsgs).isNotNull(); + assertThat(queryMsgs.size()).isEqualTo(msgSize); + } + + @Test + public void testQueryMax() { + int msgSize = 500; + int max = 64 * brokerNum; + String key = "jueyin"; + long begin = System.currentTimeMillis(); + List<Object> msgs = MQMessageFactory.getKeyMsg(topic, key, msgSize); + producer.send(msgs); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + List<MessageExt> queryMsgs = null; + try { + queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 15000, + System.currentTimeMillis() + 15000).getMessageList(); + + int i = 3; + while (queryMsgs == null || queryMsgs.size() != brokerNum) { + i--; + queryMsgs = producer.getProducer().queryMessage(topic, key, msgSize, begin - 15000, + System.currentTimeMillis() + 15000).getMessageList(); + TestUtils.waitForMonment(1000); + + if (i == 0 || (queryMsgs != null && queryMsgs.size() == max)) { + break; + } + } + } catch (Exception e) { + } + + assertThat(queryMsgs).isNotNull(); + assertThat(queryMsgs.size()).isEqualTo(max); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/delay/DelayConf.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/DelayConf.java b/test/src/test/java/org/apache/rocketmq/test/delay/DelayConf.java new file mode 100644 index 0000000..6c23473 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/delay/DelayConf.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.delay; + +import org.apache.rocketmq.test.base.BaseConf; + +public class DelayConf extends BaseConf { + protected static final int[] DELAY_LEVEL = { + 1, 5, 10, 30, 1 * 60, 5 * 60, 10 * 60, + 30 * 60, 1 * 3600, 2 * 3600, 6 * 3600, 12 * 3600, 1 * 24 * 3600}; + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java new file mode 100644 index 0000000..5206dcb --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/delay/NormalMsgDelayIT.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.delay; + +import java.util.List; +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.client.consumer.balance.NormalMsgStaticBalanceIT; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.factory.MQMessageFactory; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQDelayListner; +import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class NormalMsgDelayIT extends DelayConf { + private static Logger logger = Logger.getLogger(NormalMsgStaticBalanceIT.class); + protected int msgSize = 100; + private RMQNormalProducer producer = null; + private RMQNormalConsumer consumer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + consumer = getConsumer(nsAddr, topic, "*", new RMQOrderListener()); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testDelayLevell() { + int delayLevel = 1; + List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize); + producer.send(delayMsgs); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())); + Assert.assertEquals("Timer is not correct", true, + VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000, + ((RMQDelayListner) consumer.getListner()).getMsgDelayTimes())); + } + + @Test + public void testDelayLevel2() { + int delayLevel = 2; + List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize); + producer.send(delayMsgs); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), + DELAY_LEVEL[delayLevel - 1] * 1000 * 2); + Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())); + Assert.assertEquals("Timer is not correct", true, + VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000, + ((RMQDelayListner) consumer.getListner()).getMsgDelayTimes())); + } + + @Test + public void testDelayLevel3() { + int delayLevel = 3; + List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize); + producer.send(delayMsgs); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), + DELAY_LEVEL[delayLevel - 1] * 1000 * 2); + Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())); + Assert.assertEquals("Timer is not correct", true, + VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000, + ((RMQDelayListner) consumer.getListner()).getMsgDelayTimes())); + } + + @Test + public void testDelayLevel4() { + int delayLevel = 4; + List<Object> delayMsgs = MQMessageFactory.getDelayMsg(topic, delayLevel, msgSize); + producer.send(delayMsgs); + Assert.assertEquals("Not all are sent", msgSize, producer.getAllUndupMsgBody().size()); + + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), + DELAY_LEVEL[delayLevel - 1] * 1000 * 2); + Assert.assertEquals("Not all are consumed", 0, VerifyUtils.verify(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())); + Assert.assertEquals("Timer is not correct", true, + VerifyUtils.verifyDelay(DELAY_LEVEL[delayLevel - 1] * 1000, + ((RMQDelayListner) consumer.getListner()).getMsgDelayTimes())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java new file mode 100644 index 0000000..c422501 --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.smoke; + +import org.apache.log4j.Logger; +import org.apache.rocketmq.test.base.BaseConf; +import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; +import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; +import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner; +import org.apache.rocketmq.test.util.VerifyUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static com.google.common.truth.Truth.assertThat; + +public class NormalMessageSendAndRecvIT extends BaseConf { + private static Logger logger = Logger.getLogger(NormalMessageSendAndRecvIT.class); + private RMQNormalConsumer consumer = null; + private RMQNormalProducer producer = null; + private String topic = null; + + @Before + public void setUp() { + topic = initTopic(); + logger.info(String.format("use topic: %s;", topic)); + producer = getProducer(nsAddr, topic); + consumer = getConsumer(nsAddr, topic, "*", new RMQNormalListner()); + } + + @After + public void tearDown() { + super.shutDown(); + } + + @Test + public void testSynSendMessage() { + int msgSize = 10; + producer.send(msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize, producer.getAllUndupMsgBody().size()); + consumer.getListner().waitForMessageConsume(producer.getAllMsgBody(), consumeTime); + assertThat(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), + consumer.getListner().getAllMsgBody())) + .containsExactlyElementsIn(producer.getAllMsgBody()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/test/src/test/resources/log4j.xml b/test/src/test/resources/log4j.xml new file mode 100644 index 0000000..3031095 --- /dev/null +++ b/test/src/test/resources/log4j.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender"> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %p [%F %M %L]: %m%n"/> + </layout> + <filter class="org.apache.log4j.varia.LevelRangeFilter"> + <param name="LevelMax" value="ERROR"/> + <param name="LevelMin" value="TRACE"/> + </filter> + </appender> + + <appender name="TOTAL" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="it_test.log"/> + <param name="append" value="false"/> + <param name="MaxFileSize" value="10240000"/> + <param name="MaxBackupIndex" value="10"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d - %c -%-4r [%t] %-5p %x - %m%n"/> + </layout> + </appender> + + <root> + <level value="OFF"/> + <appender-ref ref="TOTAL"/> + </root> +</log4j:configuration> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/788771a8/test/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/test/src/test/resources/logback-test.xml b/test/src/test/resources/logback-test.xml new file mode 100644 index 0000000..eb12a9a --- /dev/null +++ b/test/src/test/resources/logback-test.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + + <appender name="DefaultAppender" class="ch.qos.logback.core.ConsoleAppender"> + <append>true</append> + <encoder> + <pattern>%d{yyy-MM-dd HH\:mm\:ss,GMT+8} %p %t - %m%n</pattern> + <charset class="java.nio.charset.Charset">UTF-8</charset> + </encoder> + </appender> + + <root> + <level value="OFF"/> + <appender-ref ref="DefaultAppender"/> + </root> +</configuration>
