Add integration test: SharedDurableConsumerTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/7d4d8a4d Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/7d4d8a4d Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/7d4d8a4d Branch: refs/heads/jms-dev-1.1.0 Commit: 7d4d8a4d42b94afea44c2bbd2790ea14bf7e5d30 Parents: e1a80ff Author: zhangke <zhangke_beij...@qq.com> Authored: Wed Mar 1 08:28:14 2017 +0800 Committer: zhangke <zhangke_beij...@qq.com> Committed: Wed Mar 1 08:28:14 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/jms/DeliverMessageService.java | 2 +- .../rocketmq/jms/integration/Constant.java | 2 + .../jms/integration/NonDurableConsumeTest.java | 123 +++++++++++++++++++ .../integration/SharedDurableConsumeTest.java | 104 ++++++++++++++++ .../jms/integration/UnDurableConsumeTest.java | 123 ------------------- 5 files changed, 230 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java index 5cc412e..e568d4c 100644 --- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java +++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java @@ -106,7 +106,7 @@ public class DeliverMessageService extends ServiceThread { this.rocketMQPullConsumer.start(); } catch (MQClientException e) { - throw new JMSRuntimeException("Fail to start RocketMQ pull consumer, error msg:%s", ExceptionUtils.getStackTrace(e)); + throw new JMSRuntimeException(format("Fail to start RocketMQ pull consumer, error msg:%s", ExceptionUtils.getStackTrace(e))); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java b/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java index 36514ae..fd53608 100644 --- a/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java +++ b/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java @@ -34,4 +34,6 @@ public class Constant { public static final int BROKER_HA_PORT = 9043; public static final String CLIENT_ID = "coffee"; + + public static final String CLIENT_ID_SECOND = "tea"; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java new file mode 100644 index 0000000..82f73fb --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class NonDurableConsumeTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + /** + * Test messages that producer after consumer inactive will not be delivered to consumer when it start again. + * + * <p>Test step: + * 1. Create a consumer and start the connection + * 2. Create a producer and send a message(msgA) to the topic subscribed by previous consumer + * 3. MsgA should be consumed successfully + * 4. Close the consumer and stop the connection + * 5. Producer sends a message(msgB) after the consumer closed + * 6. Create another consumer which is a non-durable one, and start the connection + * 7. Result: msgB should be consumed by the previous non-durable consumer + * + * @throws Exception + */ + @Test + public void testConsumeNotDurable() throws Exception { + final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connection = factory.createConnection(); + Session session = connection.createSession(); + connection.start(); + Topic topic = session.createTopic(rmqTopicName); + + try { + //consumer + final List<Message> received = new ArrayList(); + final MessageListener msgListener = new MessageListener() { + @Override public void onMessage(Message message) { + received.add(message); + } + }; + MessageConsumer consumer = session.createConsumer(topic); + consumer.setMessageListener(msgListener); + + connection.start(); + + //producer + TextMessage message = session.createTextMessage("a"); + MessageProducer producer = session.createProducer(topic); + producer.send(message); + + Thread.sleep(1000 * 2); + + assertThat(received.size(), is(1)); + received.clear(); + + // close the consumer + connection.stop(); + consumer.close(); + + // send message + TextMessage lostMessage = session.createTextMessage("b"); + producer.send(lostMessage); + + Thread.sleep(1000 * 2); + + // start the non-durable consumer again + consumer = session.createConsumer(topic, "topic"); + consumer.setMessageListener(msgListener); + connection.start(); + + Thread.sleep(1000 * 3); + + assertThat(received.size(), is(0)); + + } + finally { + connection.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java new file mode 100644 index 0000000..715f70c --- /dev/null +++ b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.jms.integration; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import org.apache.rocketmq.jms.RocketMQConnectionFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = AppConfig.class) +public class SharedDurableConsumeTest { + + @Autowired + private RocketMQAdmin rocketMQAdmin; + + @Test + public void test() throws Exception { + final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); + rocketMQAdmin.createTopic(rmqTopicName); + + ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); + Connection connectionA = null, connectionB = null; + final String subscriptionName = "MySubscription"; + final List<Message> receivedA = new ArrayList(), receivedB = new ArrayList(); + + try { + // consumerA + connectionA = factory.createConnection(); + Session sessionA = connectionA.createSession(); + connectionA.start(); + Topic topic = sessionA.createTopic(rmqTopicName); + MessageConsumer consumerA = sessionA.createSharedDurableConsumer(topic, subscriptionName); + consumerA.setMessageListener(new MessageListener() { + @Override public void onMessage(Message message) { + receivedA.add(message); + } + }); + + // consumerB + connectionB = factory.createConnection(); + Session sessionB = connectionB.createSession(); + MessageConsumer consumerB = sessionB.createSharedDurableConsumer(topic, subscriptionName); + consumerB.setMessageListener(new MessageListener() { + @Override public void onMessage(Message message) { + receivedB.add(message); + } + }); + + connectionA.start(); + connectionB.start(); + + //producer + TextMessage message = sessionA.createTextMessage("a"); + MessageProducer producer = sessionA.createProducer(topic); + for (int i = 0; i < 10; i++) { + producer.send(message); + } + + Thread.sleep(1000 * 5); + + assertThat(receivedA.size(), is(10)); + assertThat(receivedB.size(), is(10)); + } + finally { + connectionA.close(); + connectionB.close(); + rocketMQAdmin.deleteTopic(rmqTopicName); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java b/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java deleted file mode 100644 index c03e86d..0000000 --- a/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.jms.integration; - -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.jms.Topic; -import org.apache.rocketmq.jms.RocketMQConnectionFactory; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.Is.is; - -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes = AppConfig.class) -public class UnDurableConsumeTest { - - @Autowired - private RocketMQAdmin rocketMQAdmin; - - /** - * Test messages that producer after consumer inactive will not be delivered to consumer when it start again. - * - * <p>Test step: - * 1. Create a consumer and start the connection - * 2. Create a producer and send a message(msgA) to the topic subscribed by previous consumer - * 3. MsgA should be consumed successfully - * 4. Close the consumer and stop the connection - * 5. Producer sends a message(msgB) after the consumer closed - * 6. Create another consumer which is a un-durable one, and start the connection - * 7. Result: msgB should be consumed by the previous un-durable consumer - * - * @throws Exception - */ - @Test - public void testConsumeNotDurable() throws Exception { - final String rmqTopicName = "coffee-syn" + UUID.randomUUID().toString(); - rocketMQAdmin.createTopic(rmqTopicName); - - ConnectionFactory factory = new RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID); - Connection connection = factory.createConnection(); - Session session = connection.createSession(); - connection.start(); - Topic topic = session.createTopic(rmqTopicName); - - try { - //consumer - final List<Message> received = new ArrayList(); - final MessageListener msgListener = new MessageListener() { - @Override public void onMessage(Message message) { - received.add(message); - } - }; - MessageConsumer consumer = session.createConsumer(topic); - consumer.setMessageListener(msgListener); - - connection.start(); - - //producer - TextMessage message = session.createTextMessage("a"); - MessageProducer producer = session.createProducer(topic); - producer.send(message); - - Thread.sleep(1000 * 2); - - assertThat(received.size(), is(1)); - received.clear(); - - // close the consumer - connection.stop(); - consumer.close(); - - // send message - TextMessage lostMessage = session.createTextMessage("b"); - producer.send(lostMessage); - - Thread.sleep(1000 * 2); - - // start the un-durable consumer again - consumer = session.createConsumer(topic, "topic"); - consumer.setMessageListener(msgListener); - connection.start(); - - Thread.sleep(1000 * 3); - - assertThat(received.size(), is(0)); - - } - finally { - connection.close(); - rocketMQAdmin.deleteTopic(rmqTopicName); - } - } - -}