Add integration test: SharedDurableConsumerTest

Project: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/7d4d8a4d
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/7d4d8a4d
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/7d4d8a4d

Branch: refs/heads/jms-dev-1.1.0
Commit: 7d4d8a4d42b94afea44c2bbd2790ea14bf7e5d30
Parents: e1a80ff
Author: zhangke <zhangke_beij...@qq.com>
Authored: Wed Mar 1 08:28:14 2017 +0800
Committer: zhangke <zhangke_beij...@qq.com>
Committed: Wed Mar 1 08:28:14 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/jms/DeliverMessageService.java     |   2 +-
 .../rocketmq/jms/integration/Constant.java      |   2 +
 .../jms/integration/NonDurableConsumeTest.java  | 123 +++++++++++++++++++
 .../integration/SharedDurableConsumeTest.java   | 104 ++++++++++++++++
 .../jms/integration/UnDurableConsumeTest.java   | 123 -------------------
 5 files changed, 230 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java 
b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
index 5cc412e..e568d4c 100644
--- a/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
+++ b/core/src/main/java/org/apache/rocketmq/jms/DeliverMessageService.java
@@ -106,7 +106,7 @@ public class DeliverMessageService extends ServiceThread {
             this.rocketMQPullConsumer.start();
         }
         catch (MQClientException e) {
-            throw new JMSRuntimeException("Fail to start RocketMQ pull 
consumer, error msg:%s", ExceptionUtils.getStackTrace(e));
+            throw new JMSRuntimeException(format("Fail to start RocketMQ pull 
consumer, error msg:%s", ExceptionUtils.getStackTrace(e)));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java
----------------------------------------------------------------------
diff --git 
a/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java 
b/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java
index 36514ae..fd53608 100644
--- a/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java
+++ b/test/src/main/java/org/apache/rocketmq/jms/integration/Constant.java
@@ -34,4 +34,6 @@ public class Constant {
     public static final int BROKER_HA_PORT = 9043;
 
     public static final String CLIENT_ID = "coffee";
+
+    public static final String CLIENT_ID_SECOND = "tea";
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
new file mode 100644
index 0000000..82f73fb
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/NonDurableConsumeTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class NonDurableConsumeTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    /**
+     * Test messages that producer after consumer inactive will not be 
delivered to consumer when it start again.
+     *
+     * <p>Test step:
+     * 1. Create a consumer and start the connection
+     * 2. Create a producer and send a message(msgA) to the topic subscribed 
by previous consumer
+     * 3. MsgA should be consumed successfully
+     * 4. Close the consumer and stop the connection
+     * 5. Producer sends a message(msgB) after the consumer closed
+     * 6. Create another consumer which is a non-durable one, and start the 
connection
+     * 7. Result: msgB should be consumed by the previous non-durable consumer
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testConsumeNotDurable() throws Exception {
+        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession();
+        connection.start();
+        Topic topic = session.createTopic(rmqTopicName);
+
+        try {
+            //consumer
+            final List<Message> received = new ArrayList();
+            final MessageListener msgListener = new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    received.add(message);
+                }
+            };
+            MessageConsumer consumer = session.createConsumer(topic);
+            consumer.setMessageListener(msgListener);
+
+            connection.start();
+
+            //producer
+            TextMessage message = session.createTextMessage("a");
+            MessageProducer producer = session.createProducer(topic);
+            producer.send(message);
+
+            Thread.sleep(1000 * 2);
+
+            assertThat(received.size(), is(1));
+            received.clear();
+
+            // close the consumer
+            connection.stop();
+            consumer.close();
+
+            // send message
+            TextMessage lostMessage = session.createTextMessage("b");
+            producer.send(lostMessage);
+
+            Thread.sleep(1000 * 2);
+
+            // start the non-durable consumer again
+            consumer = session.createConsumer(topic, "topic");
+            consumer.setMessageListener(msgListener);
+            connection.start();
+
+            Thread.sleep(1000 * 3);
+
+            assertThat(received.size(), is(0));
+
+        }
+        finally {
+            connection.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
new file mode 100644
index 0000000..715f70c
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/jms/integration/SharedDurableConsumeTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.RocketMQConnectionFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = AppConfig.class)
+public class SharedDurableConsumeTest {
+
+    @Autowired
+    private RocketMQAdmin rocketMQAdmin;
+
+    @Test
+    public void test() throws Exception {
+        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
+        rocketMQAdmin.createTopic(rmqTopicName);
+
+        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
+        Connection connectionA = null, connectionB = null;
+        final String subscriptionName = "MySubscription";
+        final List<Message> receivedA = new ArrayList(), receivedB = new 
ArrayList();
+
+        try {
+            // consumerA
+            connectionA = factory.createConnection();
+            Session sessionA = connectionA.createSession();
+            connectionA.start();
+            Topic topic = sessionA.createTopic(rmqTopicName);
+            MessageConsumer consumerA = 
sessionA.createSharedDurableConsumer(topic, subscriptionName);
+            consumerA.setMessageListener(new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    receivedA.add(message);
+                }
+            });
+
+            // consumerB
+            connectionB = factory.createConnection();
+            Session sessionB = connectionB.createSession();
+            MessageConsumer consumerB = 
sessionB.createSharedDurableConsumer(topic, subscriptionName);
+            consumerB.setMessageListener(new MessageListener() {
+                @Override public void onMessage(Message message) {
+                    receivedB.add(message);
+                }
+            });
+
+            connectionA.start();
+            connectionB.start();
+
+            //producer
+            TextMessage message = sessionA.createTextMessage("a");
+            MessageProducer producer = sessionA.createProducer(topic);
+            for (int i = 0; i < 10; i++) {
+                producer.send(message);
+            }
+
+            Thread.sleep(1000 * 5);
+
+            assertThat(receivedA.size(), is(10));
+            assertThat(receivedB.size(), is(10));
+        }
+        finally {
+            connectionA.close();
+            connectionB.close();
+            rocketMQAdmin.deleteTopic(rmqTopicName);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/7d4d8a4d/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java
 
b/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java
deleted file mode 100644
index c03e86d..0000000
--- 
a/test/src/test/java/org/apache/rocketmq/jms/integration/UnDurableConsumeTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.jms.integration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import org.apache.rocketmq.jms.RocketMQConnectionFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes = AppConfig.class)
-public class UnDurableConsumeTest {
-
-    @Autowired
-    private RocketMQAdmin rocketMQAdmin;
-
-    /**
-     * Test messages that producer after consumer inactive will not be 
delivered to consumer when it start again.
-     *
-     * <p>Test step:
-     * 1. Create a consumer and start the connection
-     * 2. Create a producer and send a message(msgA) to the topic subscribed 
by previous consumer
-     * 3. MsgA should be consumed successfully
-     * 4. Close the consumer and stop the connection
-     * 5. Producer sends a message(msgB) after the consumer closed
-     * 6. Create another consumer which is a un-durable one, and start the 
connection
-     * 7. Result: msgB should be consumed by the previous un-durable consumer
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testConsumeNotDurable() throws Exception {
-        final String rmqTopicName = "coffee-syn" + 
UUID.randomUUID().toString();
-        rocketMQAdmin.createTopic(rmqTopicName);
-
-        ConnectionFactory factory = new 
RocketMQConnectionFactory(Constant.NAME_SERVER_ADDRESS, Constant.CLIENT_ID);
-        Connection connection = factory.createConnection();
-        Session session = connection.createSession();
-        connection.start();
-        Topic topic = session.createTopic(rmqTopicName);
-
-        try {
-            //consumer
-            final List<Message> received = new ArrayList();
-            final MessageListener msgListener = new MessageListener() {
-                @Override public void onMessage(Message message) {
-                    received.add(message);
-                }
-            };
-            MessageConsumer consumer = session.createConsumer(topic);
-            consumer.setMessageListener(msgListener);
-
-            connection.start();
-
-            //producer
-            TextMessage message = session.createTextMessage("a");
-            MessageProducer producer = session.createProducer(topic);
-            producer.send(message);
-
-            Thread.sleep(1000 * 2);
-
-            assertThat(received.size(), is(1));
-            received.clear();
-
-            // close the consumer
-            connection.stop();
-            consumer.close();
-
-            // send message
-            TextMessage lostMessage = session.createTextMessage("b");
-            producer.send(lostMessage);
-
-            Thread.sleep(1000 * 2);
-
-            // start the un-durable consumer again
-            consumer = session.createConsumer(topic, "topic");
-            consumer.setMessageListener(msgListener);
-            connection.start();
-
-            Thread.sleep(1000 * 3);
-
-            assertThat(received.size(), is(0));
-
-        }
-        finally {
-            connection.close();
-            rocketMQAdmin.deleteTopic(rmqTopicName);
-        }
-    }
-
-}

Reply via email to