Repository: activemq-artemis
Updated Branches:
  refs/heads/ARTEMIS-780 67743886d -> a73aa0951


added a few tests


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a73aa095
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a73aa095
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a73aa095

Branch: refs/heads/ARTEMIS-780
Commit: a73aa0951bed8669e031fcd2053d499678eb975c
Parents: 6774388
Author: Howard Gao <howard....@gmail.com>
Authored: Thu Dec 1 20:01:57 2016 +0800
Committer: Howard Gao <howard....@gmail.com>
Committed: Thu Dec 1 20:03:01 2016 +0800

----------------------------------------------------------------------
 .../integration/addressing/AnycastTest.java     | 188 ++++++++++++++++++
 .../integration/addressing/MulticastTest.java   | 189 +++++++++++++++++++
 2 files changed, 377 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a73aa095/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java
new file mode 100644
index 0000000..bc39d05
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AnycastTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.addressing;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.TimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class AnycastTest extends ActiveMQTestBase {
+
+   private SimpleString baseAddress = new SimpleString("anycast.address");
+
+   private AddressInfo addressInfo;
+
+   private ActiveMQServer server;
+
+   private ClientSessionFactory sessionFactory;
+
+   @Before
+   public void setup() throws Exception {
+      server = createServer(true);
+      server.start();
+
+      server.waitForActivation(10, TimeUnit.SECONDS);
+
+      ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new 
TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory = sl.createSessionFactory();
+
+      addSessionFactory(sessionFactory);
+
+      addressInfo = new AddressInfo(baseAddress);
+      addressInfo.addRoutingType(RoutingType.ANYCAST);
+      server.createOrUpdateAddressInfo(addressInfo);
+   }
+
+   @Test
+   public void testTxCommitReceive() throws Exception {
+
+      Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, 
baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, 
false, true);
+      Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, 
baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, 
false, true);
+
+      ClientSession session = sessionFactory.createSession(false, false);
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+      ClientProducer producer = session.createProducer(baseAddress);
+
+      final int num = 10;
+
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, 
true);
+         m.getBodyBuffer().writeString("AnyCast" + i);
+         producer.send(m);
+      }
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+      session.commit();
+
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num/2 == 
q1.getMessageCount()));
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num/2 == 
q2.getMessageCount()));
+
+      ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2};
+      for (int i = 0; i < consumers.length; i++) {
+
+         for (int j = 0; j < num / 2; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + 
m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         session.commit();
+
+         assertNull(consumers[i].receive(200));
+      }
+
+      q1.deleteQueue();
+      q2.deleteQueue();
+   }
+
+   @Test
+   public void testTxRollbackReceive() throws Exception {
+
+      Queue q1 = server.createQueue(baseAddress, RoutingType.ANYCAST, 
baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, 
false, true);
+      Queue q2 = server.createQueue(baseAddress, RoutingType.ANYCAST, 
baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, 
false, true);
+
+      ClientSession session = sessionFactory.createSession(false, false);
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+      ClientProducer producer = session.createProducer(baseAddress);
+
+      final int num = 10;
+
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, 
true);
+         m.getBodyBuffer().writeString("AnyCast" + i);
+         producer.send(m);
+      }
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+      session.commit();
+      session.close();
+
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num/2 == 
q1.getMessageCount()));
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num/2 == 
q2.getMessageCount()));
+
+      ClientSession session1 = sessionFactory.createSession(false, false);
+      ClientSession session2 = sessionFactory.createSession(false, false);
+      session1.start();
+      session2.start();
+
+      consumer1 = session1.createConsumer(q1.getName());
+      consumer2 = session2.createConsumer(q2.getName());
+
+      ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2};
+      ClientSession[] sessions = new ClientSession[] {session1, session2};
+      Queue[] queues = new Queue[] {q1, q2};
+
+      for (int i = 0; i < consumers.length; i++) {
+
+         for (int j = 0; j < num / 2; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + 
m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].rollback();
+         sessions[i].close();
+
+         sessions[i] = sessionFactory.createSession(false, false);
+         sessions[i].start();
+
+         //receive same after rollback
+         consumers[i] = sessions[i].createConsumer(queues[i].getName());
+
+         for (int j = 0; j < num / 2; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + 
m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].commit();
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].close();
+      }
+
+      q1.deleteQueue();
+      q2.deleteQueue();
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a73aa095/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java
new file mode 100644
index 0000000..487a9f9
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/MulticastTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.addressing;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.TimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class MulticastTest extends ActiveMQTestBase {
+
+   private SimpleString baseAddress = new SimpleString("multicast.address");
+
+   private AddressInfo addressInfo;
+
+   private ActiveMQServer server;
+
+   private ClientSessionFactory sessionFactory;
+
+   @Before
+   public void setup() throws Exception {
+      server = createServer(true);
+      server.start();
+
+      server.waitForActivation(10, TimeUnit.SECONDS);
+
+      ServerLocator sl = ActiveMQClient.createServerLocatorWithoutHA(new 
TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory = sl.createSessionFactory();
+
+      addSessionFactory(sessionFactory);
+
+      addressInfo = new AddressInfo(baseAddress);
+      addressInfo.addRoutingType(RoutingType.MULTICAST);
+      server.createOrUpdateAddressInfo(addressInfo);
+   }
+
+   @Test
+   public void testTxCommitReceive() throws Exception {
+
+      Queue q1 = server.createQueue(baseAddress, RoutingType.MULTICAST, 
baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, 
false, true);
+      Queue q2 = server.createQueue(baseAddress, RoutingType.MULTICAST, 
baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, 
false, true);
+
+      ClientSession session = sessionFactory.createSession(false, false);
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+      ClientProducer producer = session.createProducer(baseAddress);
+
+      final int num = 10;
+
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, 
true);
+         m.getBodyBuffer().writeString("AnyCast" + i);
+         producer.send(m);
+      }
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+      session.commit();
+
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == 
q1.getMessageCount()));
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == 
q2.getMessageCount()));
+
+      ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2};
+      for (int i = 0; i < consumers.length; i++) {
+
+         for (int j = 0; j < num; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + 
m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         session.commit();
+
+         assertNull(consumers[i].receive(200));
+      }
+
+      q1.deleteQueue();
+      q2.deleteQueue();
+   }
+
+   @Test
+   public void testTxRollbackReceive() throws Exception {
+
+      Queue q1 = server.createQueue(baseAddress, RoutingType.MULTICAST, 
baseAddress.concat(".1"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, 
false, true);
+      Queue q2 = server.createQueue(baseAddress, RoutingType.MULTICAST, 
baseAddress.concat(".2"), null, true, false, Queue.MAX_CONSUMERS_UNLIMITED, 
false, true);
+
+      ClientSession session = sessionFactory.createSession(false, false);
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      ClientConsumer consumer2 = session.createConsumer(q2.getName());
+
+      ClientProducer producer = session.createProducer(baseAddress);
+
+      final int num = 10;
+
+      for (int i = 0; i < num; i++) {
+         ClientMessage m = session.createMessage(ClientMessage.TEXT_TYPE, 
true);
+         m.getBodyBuffer().writeString("AnyCast" + i);
+         producer.send(m);
+      }
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+      session.commit();
+      session.close();
+
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == 
q1.getMessageCount()));
+      assertTrue(TimeUtils.waitOnBoolean(true, 2000, () -> num == 
q2.getMessageCount()));
+
+      ClientSession session1 = sessionFactory.createSession(false, false);
+      ClientSession session2 = sessionFactory.createSession(false, false);
+      session1.start();
+      session2.start();
+
+      consumer1 = session1.createConsumer(q1.getName());
+      consumer2 = session2.createConsumer(q2.getName());
+
+      ClientConsumer[] consumers = new ClientConsumer[] {consumer1, consumer2};
+      ClientSession[] sessions = new ClientSession[] {session1, session2};
+      Queue[] queues = new Queue[] {q1, q2};
+
+      for (int i = 0; i < consumers.length; i++) {
+
+         for (int j = 0; j < num; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + 
m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].rollback();
+         sessions[i].close();
+
+         sessions[i] = sessionFactory.createSession(false, false);
+         sessions[i].start();
+
+         //receive same after rollback
+         consumers[i] = sessions[i].createConsumer(queues[i].getName());
+
+         for (int j = 0; j < num; j++) {
+            ClientMessage m = consumers[i].receive(2000);
+            assertNotNull(m);
+            System.out.println("consumer" + i + " received: " + 
m.getBodyBuffer().readString());
+         }
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].commit();
+
+         assertNull(consumers[i].receive(200));
+         sessions[i].close();
+      }
+
+      q1.deleteQueue();
+      q2.deleteQueue();
+   }
+
+}

Reply via email to