http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java index 9e3a60f..f0f9180 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityNotificationTest.java @@ -16,8 +16,8 @@ import org.junit.After; import org.junit.Test; -import static org.hornetq.api.core.management.NotificationType.SECURITY_AUTHENTICATION_VIOLATION; -import static org.hornetq.api.core.management.NotificationType.SECURITY_PERMISSION_VIOLATION; +import static org.hornetq.api.core.management.CoreNotificationType.SECURITY_AUTHENTICATION_VIOLATION; +import static org.hornetq.api.core.management.CoreNotificationType.SECURITY_PERMISSION_VIOLATION; import java.util.HashSet; import java.util.Set; @@ -148,11 +148,11 @@ public class SecurityNotificationTest extends UnitTestCase { super.setUp(); - Configuration conf = createBasicConfig(); - conf.setSecurityEnabled(true); - // the notifications are independent of JMX - conf.setJMXManagementEnabled(false); - conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration conf = createBasicConfig() + .setSecurityEnabled(true) + // the notifications are independent of JMX + .setJMXManagementEnabled(false) + .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); server = HornetQServers.newHornetQServer(conf, false); server.start();
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicOpenWireTest.java new file mode 100644 index 0000000..ae98d2e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicOpenWireTest.java @@ -0,0 +1,249 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.api.core.SimpleString; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; + +/** + * + * @author @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +public class BasicOpenWireTest extends OpenWireTestBase +{ + @Rule public TestName name = new TestName(); + + protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; + protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString); + protected ActiveMQConnection connection; + protected String topicName = "amqTestTopic1"; + protected String queueName = "amqTestQueue1"; + protected String topicName2 = "amqTestTopic2"; + protected String queueName2 = "amqTestQueue2"; + protected String durableQueueName = "durableQueueName"; + + protected String messageTextPrefix = ""; + protected boolean topic = true; + + protected Map<String, SimpleString> testQueues = new HashMap<String, SimpleString>(); + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + SimpleString coreQueue = new SimpleString("jms.queue." + queueName); + this.server.createQueue(coreQueue, coreQueue, null, false, false); + testQueues.put(queueName, coreQueue); + + SimpleString coreQueue2 = new SimpleString("jms.queue." + queueName2); + this.server.createQueue(coreQueue2, coreQueue2, null, false, false); + testQueues.put(queueName2, coreQueue2); + + SimpleString durableQueue = new SimpleString("jms.queue." + durableQueueName); + this.server.createQueue(durableQueue, durableQueue, null, true, false); + testQueues.put(durableQueueName, durableQueue); + + if (!enableSecurity) + { + connection = (ActiveMQConnection) factory.createConnection(); + } + } + + @Override + @After + public void tearDown() throws Exception + { + System.out.println("tear down! " + connection); + try + { + if (connection != null) + { + System.out.println("closing connection"); + connection.close(); + System.out.println("connection closed."); + } + + Iterator<SimpleString> iterQueues = testQueues.values().iterator(); + while (iterQueues.hasNext()) + { + SimpleString coreQ = iterQueues.next(); + this.server.destroyQueue(coreQ); + System.out.println("Destroyed queue: " + coreQ); + } + testQueues.clear(); + } + catch (Throwable e) + { + System.out.println("Exception !! " + e); + e.printStackTrace(); + } + finally + { + super.tearDown(); + System.out.println("Super done."); + } + } + + public ActiveMQDestination createDestination(Session session, byte type, String name) throws Exception + { + if (name == null) + { + return createDestination(session, type); + } + + switch (type) + { + case ActiveMQDestination.QUEUE_TYPE: + makeSureCoreQueueExist(name); + return (ActiveMQDestination) session.createQueue(name); + case ActiveMQDestination.TOPIC_TYPE: + return (ActiveMQDestination) session.createTopic(name); + case ActiveMQDestination.TEMP_QUEUE_TYPE: + return (ActiveMQDestination) session.createTemporaryQueue(); + case ActiveMQDestination.TEMP_TOPIC_TYPE: + return (ActiveMQDestination) session.createTemporaryTopic(); + default: + throw new IllegalArgumentException("type: " + type); + } + } + + public void makeSureCoreQueueExist(String qname) throws Exception + { + SimpleString coreQ = testQueues.get(qname); + if (coreQ == null) + { + coreQ = new SimpleString("jms.queue." + qname); + this.server.createQueue(coreQ, coreQ, null, false, false); + testQueues.put(qname, coreQ); + } + } + + public ActiveMQDestination createDestination(Session session, byte type) throws JMSException + { + switch (type) + { + case ActiveMQDestination.QUEUE_TYPE: + return (ActiveMQDestination) session.createQueue(queueName); + case ActiveMQDestination.TOPIC_TYPE: + return (ActiveMQDestination) session.createTopic(topicName); + case ActiveMQDestination.TEMP_QUEUE_TYPE: + return (ActiveMQDestination) session.createTemporaryQueue(); + case ActiveMQDestination.TEMP_TOPIC_TYPE: + return (ActiveMQDestination) session.createTemporaryTopic(); + default: + throw new IllegalArgumentException("type: " + type); + } + } + + protected ActiveMQDestination createDestination2(Session session, byte type) throws JMSException + { + switch (type) + { + case ActiveMQDestination.QUEUE_TYPE: + return (ActiveMQDestination) session.createQueue(queueName2); + case ActiveMQDestination.TOPIC_TYPE: + return (ActiveMQDestination) session.createTopic(topicName2); + case ActiveMQDestination.TEMP_QUEUE_TYPE: + return (ActiveMQDestination) session.createTemporaryQueue(); + case ActiveMQDestination.TEMP_TOPIC_TYPE: + return (ActiveMQDestination) session.createTemporaryTopic(); + default: + throw new IllegalArgumentException("type: " + type); + } + } + + protected void sendMessages(Session session, Destination destination, int count) throws JMSException + { + MessageProducer producer = session.createProducer(destination); + sendMessages(session, producer, count); + producer.close(); + } + + protected void sendMessages(Session session, MessageProducer producer, int count) throws JMSException + { + for (int i = 0; i < count; i++) + { + producer.send(session.createTextMessage(messageTextPrefix + i)); + } + } + + protected void sendMessages(Connection connection, Destination destination, int count) throws JMSException + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + sendMessages(session, destination, count); + session.close(); + } + + /** + * @param messsage + * @param firstSet + * @param secondSet + */ + protected void assertTextMessagesEqual(String messsage, Message[] firstSet, + Message[] secondSet) throws JMSException + { + assertEquals("Message count does not match: " + messsage, + firstSet.length, secondSet.length); + for (int i = 0; i < secondSet.length; i++) + { + TextMessage m1 = (TextMessage) firstSet[i]; + TextMessage m2 = (TextMessage) secondSet[i]; + assertFalse("Message " + (i + 1) + " did not match : " + messsage + + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null + ^ m2 == null); + assertEquals("Message " + (i + 1) + " did not match: " + messsage + + ": expected {" + m1 + "}, but was {" + m2 + "}", m1.getText(), + m2.getText()); + } + } + + protected Connection createConnection() throws JMSException + { + return factory.createConnection(); + } + + protected void safeClose(Session s) + { + try + { + s.close(); + } + catch (Throwable e) + { + } + } + + +} + + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicSecurityTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicSecurityTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicSecurityTest.java new file mode 100644 index 0000000..e7b09d2 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/BasicSecurityTest.java @@ -0,0 +1,256 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSSecurityException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Before; +import org.junit.Test; + +/** +* +* @author @author <a href="mailto:[email protected]">Howard Gao</a> +* +*/ +public class BasicSecurityTest extends BasicOpenWireTest +{ + @Before + public void setUp() throws Exception + { + this.enableSecurity = true; + super.setUp(); + } + + @Test + public void testConnectionWithCredentials() throws Exception + { + Connection newConn = null; + + //correct + try + { + newConn = factory.createConnection("openwireSender", "SeNdEr"); + newConn.start(); + newConn.close(); + + newConn = factory.createConnection("openwireReceiver", "ReCeIvEr"); + newConn.start(); + newConn.close(); + + newConn = null; + } + finally + { + if (newConn != null) + { + newConn.close(); + } + } + + //wrong password + try + { + newConn = factory.createConnection("openwireSender", "WrongPasswD"); + newConn.start(); + } + catch (JMSSecurityException e) + { + //expected + } + finally + { + if (newConn != null) + { + newConn.close(); + } + } + + //wrong user + try + { + newConn = factory.createConnection("wronguser", "SeNdEr"); + newConn.start(); + } + catch (JMSSecurityException e) + { + //expected + } + finally + { + if (newConn != null) + { + newConn.close(); + } + } + + //both wrong + try + { + newConn = factory.createConnection("wronguser", "wrongpass"); + newConn.start(); + } + catch (JMSSecurityException e) + { + //expected + } + finally + { + if (newConn != null) + { + newConn.close(); + } + } + + //default user + try + { + newConn = factory.createConnection(); + newConn.start(); + } + catch (JMSSecurityException e) + { + //expected + } + finally + { + if (newConn != null) + { + newConn.close(); + } + } + } + + @Test + public void testSendnReceiveAuthorization() throws Exception + { + Connection sendingConn = null; + Connection receivingConn = null; + + //Sender + try + { + Destination dest = new ActiveMQQueue(queueName); + + receivingConn = factory.createConnection("openwireReceiver", "ReCeIvEr"); + receivingConn.start(); + + sendingConn = factory.createConnection("openwireSender", "SeNdEr"); + sendingConn.start(); + + Session sendingSession = sendingConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session receivingSession = receivingConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TextMessage message = sendingSession.createTextMessage("Hello World"); + + MessageProducer producer = null; + + producer = receivingSession.createProducer(dest); + + try + { + producer.send(message); + } + catch (JMSSecurityException e) + { + //expected + producer.close(); + } + + producer = sendingSession.createProducer(dest); + producer.send(message); + + MessageConsumer consumer = null; + try + { + consumer = sendingSession.createConsumer(dest); + } + catch (JMSSecurityException e) + { + //expected + } + + consumer = receivingSession.createConsumer(dest); + TextMessage received = (TextMessage) consumer.receive(); + + assertNotNull(received); + assertEquals("Hello World", received.getText()); + } + finally + { + if (sendingConn != null) + { + sendingConn.close(); + } + + if (receivingConn != null) + { + receivingConn.close(); + } + } + } + + @Test + public void testCreateTempDestinationAuthorization() throws Exception + { + Connection conn1 = null; + Connection conn2 = null; + + //Sender + try + { + conn1 = factory.createConnection("openwireGuest", "GuEsT"); + conn1.start(); + + conn2 = factory.createConnection("openwireDestinationManager", "DeStInAtIoN"); + conn2.start(); + + Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try + { + session1.createTemporaryQueue(); + fail("user shouldn't be able to create temp queue"); + } + catch (JMSSecurityException e) + { + //expected + } + + Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TemporaryQueue q = session2.createTemporaryQueue(); + assertNotNull(q); + } + finally + { + if (conn1 != null) + { + conn1.close(); + } + + if (conn2 != null) + { + conn2.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireTestBase.java new file mode 100644 index 0000000..668095f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireTestBase.java @@ -0,0 +1,190 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.jms.ConnectionFactory; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; + +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.jms.management.JMSServerControl; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.remoting.impl.netty.TransportConstants; +import org.hornetq.core.security.Role; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.settings.impl.AddressSettings; +import org.hornetq.jms.server.config.ConnectionFactoryConfiguration; +import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl; +import org.hornetq.jms.server.impl.JMSServerManagerImpl; +import org.hornetq.tests.integration.management.ManagementControlHelper; +import org.hornetq.tests.unit.util.InVMNamingContext; +import org.hornetq.tests.util.ServiceTestBase; +import org.junit.After; +import org.junit.Before; + +public class OpenWireTestBase extends ServiceTestBase +{ + public static final String OWHOST = "localhost"; + public static final int OWPORT = 61616; + + protected HornetQServer server; + + protected JMSServerManagerImpl jmsServer; + protected boolean realStore = false; + protected boolean enableSecurity = false; + + protected ConnectionFactory coreCf; + protected InVMNamingContext namingContext; + + protected MBeanServer mbeanServer; + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + server = this.createServer(realStore, true); + HashMap<String, Object> params = new HashMap<String, Object>(); + params.put(TransportConstants.PORT_PROP_NAME, "61616"); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE"); + TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); + + Configuration serverConfig = server.getConfiguration(); + + Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings(); + String match = "jms.queue.#"; + AddressSettings dlaSettings = new AddressSettings(); + SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ"); + dlaSettings.setDeadLetterAddress(dla); + addressSettings.put(match, dlaSettings); + + serverConfig.getAcceptorConfigurations().add(transportConfiguration); + serverConfig.setSecurityEnabled(enableSecurity); + + extraServerConfig(serverConfig); + + if (enableSecurity) + { + server.getSecurityManager().addRole("openwireSender", "sender"); + server.getSecurityManager().addUser("openwireSender", "SeNdEr"); + //sender cannot receive + Role senderRole = new Role("sender", true, false, false, false, true, true, false); + + server.getSecurityManager().addRole("openwireReceiver", "receiver"); + server.getSecurityManager().addUser("openwireReceiver", "ReCeIvEr"); + //receiver cannot send + Role receiverRole = new Role("receiver", false, true, false, false, true, true, false); + + server.getSecurityManager().addRole("openwireGuest", "guest"); + server.getSecurityManager().addUser("openwireGuest", "GuEsT"); + + //guest cannot do anything + Role guestRole = new Role("guest", false, false, false, false, false, false, false); + + server.getSecurityManager().addRole("openwireDestinationManager", "manager"); + server.getSecurityManager().addUser("openwireDestinationManager", "DeStInAtIoN"); + + //guest cannot do anything + Role destRole = new Role("manager", false, false, false, false, true, true, false); + + Map<String, Set<Role>> settings = server.getConfiguration().getSecurityRoles(); + if (settings == null) + { + settings = new HashMap<String, Set<Role>>(); + server.getConfiguration().setSecurityRoles(settings); + } + Set<Role> anySet = settings.get("#"); + if (anySet == null) + { + anySet = new HashSet<Role>(); + settings.put("#", anySet); + } + anySet.add(senderRole); + anySet.add(receiverRole); + anySet.add(guestRole); + anySet.add(destRole); + } + jmsServer = new JMSServerManagerImpl(server); + namingContext = new InVMNamingContext(); + jmsServer.setContext(namingContext); + jmsServer.start(); + + registerConnectionFactory(); + + mbeanServer = MBeanServerFactory.createMBeanServer(); + System.out.println("debug: server started"); + } + + //override this to add extra server configs + protected void extraServerConfig(Configuration serverConfig) + { + } + + protected void registerConnectionFactory() throws Exception + { + List<TransportConfiguration> connectorConfigs = new ArrayList<TransportConfiguration>(); + connectorConfigs.add(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + + createCF(connectorConfigs, "/cf"); + + coreCf = (ConnectionFactory) namingContext.lookup("/cf"); + } + + protected void createCF(final List<TransportConfiguration> connectorConfigs, final String... jndiBindings) throws Exception + { + final int retryInterval = 1000; + final double retryIntervalMultiplier = 1.0; + final int reconnectAttempts = -1; + final int callTimeout = 30000; + final boolean ha = false; + List<String> connectorNames = registerConnectors(server, connectorConfigs); + + String cfName = name.getMethodName(); + if (cfName == null) + { + cfName = "cfOpenWire"; + } + ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl() + .setName(cfName) + .setConnectorNames(connectorNames) + .setRetryInterval(retryInterval) + .setRetryIntervalMultiplier(retryIntervalMultiplier) + .setCallTimeout(callTimeout) + .setReconnectAttempts(reconnectAttempts); + jmsServer.createConnectionFactory(false, configuration, jndiBindings); + } + + protected JMSServerControl getJMSServerControl() throws Exception + { + return ManagementControlHelper.createJMSServerControl(mbeanServer); + } + + @Override + @After + public void tearDown() throws Exception + { + MBeanServerFactory.releaseMBeanServer(mbeanServer); + mbeanServer = null; + server.stop(); + super.tearDown(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireUtilTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireUtilTest.java new file mode 100644 index 0000000..b17b8df --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/OpenWireUtilTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire; + +import static org.junit.Assert.assertEquals; + +import org.hornetq.core.protocol.openwire.OpenWireUtil; +import org.junit.Test; + +public class OpenWireUtilTest +{ + @Test + public void testWildcardConversion() throws Exception + { + String amqTarget = "TEST.ONE.>"; + String coreTarget = OpenWireUtil.convertWildcard(amqTarget); + assertEquals("TEST.ONE.#", coreTarget); + + amqTarget = "TEST.*.ONE"; + coreTarget = OpenWireUtil.convertWildcard(amqTarget); + assertEquals("TEST.*.ONE", coreTarget); + + amqTarget = "a.*.>.>"; + coreTarget = OpenWireUtil.convertWildcard(amqTarget); + assertEquals("a.*.#", coreTarget); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/SimpleOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/SimpleOpenWireTest.java new file mode 100644 index 0000000..faf6fe1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/SimpleOpenWireTest.java @@ -0,0 +1,280 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.Before; +import org.junit.Test; + +/** + * + * @author @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +public class SimpleOpenWireTest extends BasicOpenWireTest +{ + @Override + @Before + public void setUp() throws Exception + { + this.realStore = true; + super.setUp(); + } + + @Test + public void testSimpleQueue() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + System.out.println("creating queue: " + queueName); + Destination dest = new ActiveMQQueue(queueName); + + System.out.println("creating producer..."); + MessageProducer producer = session.createProducer(dest); + + final int num = 1; + final String msgBase = "MfromAMQ-"; + for (int i = 0; i < num; i++) + { + TextMessage msg = session.createTextMessage("MfromAMQ-" + i); + producer.send(msg); + System.out.println("sent: "); + } + + //receive + MessageConsumer consumer = session.createConsumer(dest); + + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) + { + TextMessage msg = (TextMessage) consumer.receive(5000); + System.out.println("received: " + msg); + String content = msg.getText(); + System.out.println("content: " + content); + assertEquals(msgBase + i, content); + } + + assertNull(consumer.receive(1000)); + + session.close(); + } + + @Test + public void testSimpleTopic() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + System.out.println("creating queue: " + topicName); + Destination dest = new ActiveMQTopic(topicName); + + MessageConsumer consumer1 = session.createConsumer(dest); + MessageConsumer consumer2 = session.createConsumer(dest); + + MessageProducer producer = session.createProducer(dest); + + final int num = 1; + final String msgBase = "MfromAMQ-"; + for (int i = 0; i < num; i++) + { + TextMessage msg = session.createTextMessage("MfromAMQ-" + i); + producer.send(msg); + System.out.println("Sent a message"); + } + + //receive + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) + { + TextMessage msg = (TextMessage) consumer1.receive(5000); + System.out.println("received: " + msg); + String content = msg.getText(); + assertEquals(msgBase + i, content); + } + + assertNull(consumer1.receive(500)); + + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) + { + TextMessage msg = (TextMessage) consumer2.receive(5000); + System.out.println("received: " + msg); + String content = msg.getText(); + assertEquals(msgBase + i, content); + } + + assertNull(consumer2.receive(500)); + session.close(); + } + + @Test + public void testSimpleTempTopic() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + System.out.println("creating temp topic"); + TemporaryTopic tempTopic = session.createTemporaryTopic(); + + System.out.println("create consumer 1"); + MessageConsumer consumer1 = session.createConsumer(tempTopic); + System.out.println("create consumer 2"); + MessageConsumer consumer2 = session.createConsumer(tempTopic); + + System.out.println("create producer"); + MessageProducer producer = session.createProducer(tempTopic); + + System.out.println("sending messages"); + final int num = 1; + final String msgBase = "MfromAMQ-"; + for (int i = 0; i < num; i++) + { + TextMessage msg = session.createTextMessage("MfromAMQ-" + i); + producer.send(msg); + System.out.println("Sent a message"); + } + + //receive + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) + { + TextMessage msg = (TextMessage) consumer1.receive(5000); + System.out.println("received: " + msg); + String content = msg.getText(); + assertEquals(msgBase + i, content); + } + + assertNull(consumer1.receive(500)); + + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) + { + TextMessage msg = (TextMessage) consumer2.receive(5000); + System.out.println("received: " + msg); + String content = msg.getText(); + assertEquals(msgBase + i, content); + } + + assertNull(consumer2.receive(500)); + session.close(); + } + + @Test + public void testSimpleTempQueue() throws Exception + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + System.out.println("creating temp queue"); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + + System.out.println("create consumer 1"); + MessageConsumer consumer1 = session.createConsumer(tempQueue); + + System.out.println("create producer"); + MessageProducer producer = session.createProducer(tempQueue); + + System.out.println("sending messages"); + final int num = 1; + final String msgBase = "MfromAMQ-"; + for (int i = 0; i < num; i++) + { + TextMessage msg = session.createTextMessage("MfromAMQ-" + i); + producer.send(msg); + System.out.println("Sent a message"); + } + + //receive + System.out.println("receiving messages..."); + for (int i = 0; i < num; i++) + { + TextMessage msg = (TextMessage) consumer1.receive(5000); + System.out.println("received: " + msg); + String content = msg.getText(); + assertEquals(msgBase + i, content); + } + + assertNull(consumer1.receive(500)); + session.close(); + } + + /** + * This is the example shipped with the distribution + * @throws Exception + */ + @Test + public void testOpenWireExample() throws Exception + { + Connection exConn = null; + + try + { + String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString); + + // Step 2. Perfom a lookup on the queue + Queue queue = new ActiveMQQueue(durableQueueName); + + // Step 4.Create a JMS Connection + exConn = exFact.createConnection(); + + // Step 10. Start the Connection + exConn.start(); + + // Step 5. Create a JMS Session + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 6. Create a JMS Message Producer + MessageProducer producer = session.createProducer(queue); + + // Step 7. Create a Text Message + TextMessage message = session.createTextMessage("This is a text message"); + + //System.out.println("Sent message: " + message.getText()); + + // Step 8. Send the Message + producer.send(message); + + // Step 9. Create a JMS Message Consumer + MessageConsumer messageConsumer = session.createConsumer(queue); + + // Step 11. Receive the message + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + + System.out.println("Received message: " + messageReceived); + + assertEquals("This is a text message", messageReceived.getText()); + } + finally + { + if (exConn != null) + { + exConn.close(); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer10Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer10Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer10Test.java new file mode 100644 index 0000000..d6a9942 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer10Test.java @@ -0,0 +1,108 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer10Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} ackMode={1} destinationType={2}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE} + }); + } + + public int deliveryMode; + public int ackMode; + public byte destinationType; + + public JMSConsumer10Test(int deliveryMode, int ackMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.ackMode = ackMode; + this.destinationType = destinationType; + } + + @Test + public void testUnackedWithPrefetch1StayInQueue() throws Exception + { + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + ActiveMQDestination destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // Only pick up the first 2 messages. + Message message = null; + for (int i = 0; i < 2; i++) + { + message = consumer.receive(1000); + assertNotNull(message); + } + message.acknowledge(); + + connection.close(); + connection = (ActiveMQConnection) factory.createConnection(); + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Use all the ack modes + session = connection.createSession(false, ackMode); + consumer = session.createConsumer(destination); + + // Pickup the rest of the messages. + for (int i = 0; i < 2; i++) + { + message = consumer.receive(1000); + assertNotNull(message); + } + message.acknowledge(); + assertNull(consumer.receiveNoWait()); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer11Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer11Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer11Test.java new file mode 100644 index 0000000..5376f88 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer11Test.java @@ -0,0 +1,115 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer11Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT}, + {DeliveryMode.PERSISTENT} + }); + } + + public int deliveryMode; + + public JMSConsumer11Test(int deliveryMode) + { + this.deliveryMode = deliveryMode; + } + + @Test + public void testPrefetch1MessageNotDispatched() throws Exception + { + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + Session session = connection.createSession(true, 0); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + MessageConsumer consumer = session.createConsumer(destination); + + // Send 2 messages to the destination. + sendMessages(session, destination, 2); + session.commit(); + + // The prefetch should fill up with 1 message. + // Since prefetch is still full, the 2nd message should get dispatched + // to another consumer.. lets create the 2nd consumer test that it does + // make sure it does. + ActiveMQConnection connection2 = (ActiveMQConnection) factory + .createConnection(); + connection2.start(); + Session session2 = connection2.createSession(true, 0); + MessageConsumer consumer2 = session2.createConsumer(destination); + + System.out.println("consumer receiving ..."); + // Pick up the first message. + Message message1 = consumer.receive(1000); + System.out.println("received1: " + message1); + assertNotNull(message1); + + System.out.println("consumer 2 receiving..."); + // Pick up the 2nd messages. + Message message2 = consumer2.receive(5000); + System.out.println("received2: " + message2); + assertNotNull(message2); + + System.out.println("commitning sessions !! " + session.getClass().getName()); + session.commit(); + System.out.println("commited session, now 2"); + session2.commit(); + + System.out.println("all commited"); + Message m = consumer.receiveNoWait(); + System.out.println("recieved 3: " + m); + assertNull(m); + + try + { + connection2.close(); + } + catch (Throwable e) + { + System.err.println("exception e: " + e); + e.printStackTrace(); + } + + System.out.println("Test finished!!"); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer12Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer12Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer12Test.java new file mode 100644 index 0000000..633cd01 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer12Test.java @@ -0,0 +1,92 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; + +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer12Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE} + }); + } + + public int deliveryMode; + public byte destinationType; + + public JMSConsumer12Test(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testDontStart() throws Exception + { + + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 1); + + // Make sure no messages were delivered. + assertNull(consumer.receive(1000)); + } + + @Test + public void testStartAfterSend() throws Exception + { + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 1); + + // Start the conncection after the message was sent. + connection.start(); + + // Make sure only 1 message was delivered. + assertNotNull(consumer.receive(1000)); + assertNull(consumer.receiveNoWait()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer13Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer13Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer13Test.java new file mode 100644 index 0000000..82d8326 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer13Test.java @@ -0,0 +1,85 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer13Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}, + }); + } + + public int deliveryMode; + public byte destinationType; + + public JMSConsumer13Test(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testReceiveMessageWithConsumer() throws Exception + { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 1); + + // Make sure only 1 message was delivered. + Message m = consumer.receive(1000); + assertNotNull(m); + assertEquals("0", ((TextMessage) m).getText()); + assertNull(consumer.receiveNoWait()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer1Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer1Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer1Test.java new file mode 100644 index 0000000..622a55b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer1Test.java @@ -0,0 +1,121 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer1Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE} + }); + } + + public ActiveMQDestination destination; + public int deliveryMode; + public int prefetch; + public int ackMode; + public byte destinationType; + public boolean durableConsumer; + + public JMSConsumer1Test(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testMessageListenerWithConsumerCanBeStopped() throws Exception + { + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done1 = new CountDownLatch(1); + final CountDownLatch done2 = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session + .createConsumer(destination); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + counter.incrementAndGet(); + if (counter.get() == 1) + { + done1.countDown(); + } + if (counter.get() == 2) + { + done2.countDown(); + } + } + }); + + // Send a first message to make sure that the consumer dispatcher is + // running + sendMessages(session, destination, 1); + assertTrue(done1.await(1, TimeUnit.SECONDS)); + assertEquals(1, counter.get()); + + // Stop the consumer. + consumer.stop(); + + // Send a message, but should not get delivered. + sendMessages(session, destination, 1); + assertFalse(done2.await(1, TimeUnit.SECONDS)); + assertEquals(1, counter.get()); + + // Start the consumer, and the message should now get delivered. + consumer.start(); + assertTrue(done2.await(1, TimeUnit.SECONDS)); + assertEquals(2, counter.get()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer2Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer2Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer2Test.java new file mode 100644 index 0000000..cee9545 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer2Test.java @@ -0,0 +1,240 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +public class JMSConsumer2Test extends BasicOpenWireTest +{ + @Test + public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception + { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch closeDone = new CountDownLatch(1); + + connection.start(); + Session session = connection.createSession(false, + Session.CLIENT_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + ActiveMQDestination.QUEUE_TYPE); + + // preload the queue + sendMessages(session, destination, 2000); + + final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session + .createConsumer(destination); + + final Map<Thread, Throwable> exceptions = Collections + .synchronizedMap(new HashMap<Thread, Throwable>()); + Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() + { + @Override + public void uncaughtException(Thread t, Throwable e) + { + exceptions.put(t, e); + } + }); + + final class AckAndClose implements Runnable + { + private final Message message; + + public AckAndClose(Message m) + { + this.message = m; + } + + @Override + public void run() + { + try + { + int count = counter.incrementAndGet(); + if (count == 590) + { + // close in a separate thread is ok by jms + consumer.close(); + closeDone.countDown(); + } + if (count % 200 == 0) + { + // ensure there are some outstanding messages + // ack every 200 + message.acknowledge(); + } + } + catch (Exception e) + { + exceptions.put(Thread.currentThread(), e); + } + } + } + + final ExecutorService executor = Executors.newCachedThreadPool(); + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + // ack and close eventually in separate thread + executor.execute(new AckAndClose(m)); + } + }); + + assertTrue(closeDone.await(20, TimeUnit.SECONDS)); + // await possible exceptions + Thread.sleep(1000); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + + @Test + public void testDupsOkConsumer() throws Exception + { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.DUPS_OK_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // Make sure only 4 message are delivered. + for (int i = 0; i < 4; i++) + { + Message m = consumer.receive(1000); + assertNotNull(m); + } + assertNull(consumer.receive(1000)); + + // Close out the consumer.. no other messages should be left on the queue. + consumer.close(); + + consumer = session.createConsumer(destination); + assertNull(consumer.receive(1000)); + } + + @Test + public void testRedispatchOfUncommittedTx() throws Exception + { + connection.start(); + Session session = connection.createSession(true, + Session.SESSION_TRANSACTED); + ActiveMQDestination destination = createDestination(session, + ActiveMQDestination.QUEUE_TYPE); + + sendMessages(connection, destination, 2); + + MessageConsumer consumer = session.createConsumer(destination); + Message m = consumer.receive(1000); + System.out.println("m1 received: " + m); + assertNotNull(m); + m = consumer.receive(5000); + System.out.println("m2 received: " + m); + assertNotNull(m); + + // install another consumer while message dispatch is unacked/uncommitted + Session redispatchSession = connection.createSession(true, + Session.SESSION_TRANSACTED); + MessageConsumer redispatchConsumer = redispatchSession + .createConsumer(destination); + System.out.println("redispatch consumer: " + redispatchConsumer); + + // no commit so will auto rollback and get re-dispatched to + // redisptachConsumer + System.out.println("closing session: " + session); + session.close(); + + Message msg = redispatchConsumer.receive(3000); + assertNotNull(msg); + + assertTrue("redelivered flag set", msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + + msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + redispatchSession.commit(); + + assertNull(redispatchConsumer.receive(500)); + System.out.println("closing dispatch session: " + redispatchSession); + redispatchSession.close(); + } + + @Test + public void testRedispatchOfRolledbackTx() throws Exception + { + + connection.start(); + Session session = connection.createSession(true, + Session.SESSION_TRANSACTED); + ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + sendMessages(connection, destination, 2); + + MessageConsumer consumer = session.createConsumer(destination); + assertNotNull(consumer.receive(1000)); + assertNotNull(consumer.receive(1000)); + + // install another consumer while message dispatch is unacked/uncommitted + Session redispatchSession = connection.createSession(true, + Session.SESSION_TRANSACTED); + MessageConsumer redispatchConsumer = redispatchSession + .createConsumer(destination); + + session.rollback(); + session.close(); + + Message msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + redispatchSession.commit(); + + assertNull(redispatchConsumer.receive(500)); + redispatchSession.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer3Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer3Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer3Test.java new file mode 100644 index 0000000..a444fff --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer3Test.java @@ -0,0 +1,113 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer3Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} ackMode={1} destinationType={2}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.AUTO_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.DUPS_OK_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, Session.CLIENT_ACKNOWLEDGE, ActiveMQDestination.TEMP_TOPIC_TYPE} + }); + } + + public ActiveMQDestination destination; + public int deliveryMode; + public int prefetch; + public int ackMode; + public byte destinationType; + public boolean durableConsumer; + + public JMSConsumer3Test(int deliveryMode, int ackMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.ackMode = ackMode; + this.destinationType = destinationType; + } + + @Test + public void testMutiReceiveWithPrefetch1() throws Exception + { + // Set prefetch to 1 + ((ActiveMQConnection)connection).getPrefetchPolicy().setAll(1); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + System.out.println("messages are sent."); + // Make sure 4 messages were delivered. + Message message = null; + for (int i = 0; i < 4; i++) + { + message = consumer.receive(5000); + System.out.println("message received: " + message + " ack mode: " + ackMode); + assertNotNull(message); + } + assertNull(consumer.receiveNoWait()); + message.acknowledge(); + } + +} + http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer4Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer4Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer4Test.java new file mode 100644 index 0000000..d4fe157 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer4Test.java @@ -0,0 +1,103 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer4Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE} + }); + } + + public int deliveryMode; + public byte destinationType; + + public JMSConsumer4Test(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testDurableConsumerSelectorChange() throws Exception + { + // Receive a message with the JMS API + connection.setClientID("test"); + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + MessageConsumer consumer = session.createDurableSubscriber( + (Topic) destination, "test", "color='red'", false); + + // Send the messages + TextMessage message = session.createTextMessage("1st"); + message.setStringProperty("color", "red"); + producer.send(message); + + Message m = consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", ((TextMessage) m).getText()); + + // Change the subscription. + consumer.close(); + consumer = session.createDurableSubscriber((Topic) destination, "test", + "color='blue'", false); + + message = session.createTextMessage("2nd"); + message.setStringProperty("color", "red"); + producer.send(message); + message = session.createTextMessage("3rd"); + message.setStringProperty("color", "blue"); + producer.send(message); + + // Selector should skip the 2nd message. + m = consumer.receive(1000); + assertNotNull(m); + assertEquals("3rd", ((TextMessage) m).getText()); + + assertNull(consumer.receiveNoWait()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer5Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer5Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer5Test.java new file mode 100644 index 0000000..70d7c41 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer5Test.java @@ -0,0 +1,135 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer5Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "deliveryMode={0} destinationType={1}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.NON_PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TOPIC_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_QUEUE_TYPE}, + {DeliveryMode.PERSISTENT, ActiveMQDestination.TEMP_TOPIC_TYPE} + }); + } + + public int deliveryMode; + public byte destinationType; + + public JMSConsumer5Test(int deliveryMode, byte destinationType) + { + this.deliveryMode = deliveryMode; + this.destinationType = destinationType; + } + + @Test + public void testSendReceiveBytesMessage() throws Exception + { + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + BytesMessage message = session.createBytesMessage(); + message.writeBoolean(true); + message.writeBoolean(false); + producer.send(message); + + // Make sure only 1 message was delivered. + BytesMessage m = (BytesMessage) consumer.receive(1000); + assertNotNull(m); + assertTrue(m.readBoolean()); + assertFalse(m.readBoolean()); + + assertNull(consumer.receiveNoWait()); + } + + @Test + public void testSetMessageListenerAfterStart() throws Exception + { + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // See if the message get sent to the listener + consumer.setMessageListener(new MessageListener() + { + @Override + public void onMessage(Message m) + { + counter.incrementAndGet(); + if (counter.get() == 4) + { + System.out.println("ok finished all 4, done sleep"); + done.countDown(); + } + } + }); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + System.out.println("ok await ok"); + Thread.sleep(200); + + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + System.out.println("test done ok " + counter.get()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer6Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer6Test.java b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer6Test.java new file mode 100644 index 0000000..d17dd06 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/openwire/amq/JMSConsumer6Test.java @@ -0,0 +1,146 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.tests.integration.openwire.amq; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.command.ActiveMQDestination; +import org.hornetq.tests.integration.openwire.BasicOpenWireTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * adapted from: org.apache.activemq.JMSConsumerTest + * + * @author <a href="mailto:[email protected]">Howard Gao</a> + * + */ +@RunWith(Parameterized.class) +public class JMSConsumer6Test extends BasicOpenWireTest +{ + @Parameterized.Parameters(name = "destinationType={0}") + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {ActiveMQDestination.QUEUE_TYPE}, + {ActiveMQDestination.TOPIC_TYPE} + }); + } + + public byte destinationType; + + public JMSConsumer6Test(byte destinationType) + { + this.destinationType = destinationType; + } + + @Test + public void testPassMessageListenerIntoCreateConsumer() throws Exception + { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination, + new MessageListener() + { + @Override + public void onMessage(Message m) + { + counter.incrementAndGet(); + if (counter.get() == 4) + { + done.countDown(); + } + } + }); + assertNotNull(consumer); + + // Send the messages + sendMessages(session, destination, 4); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + + @Test + public void testAckOfExpired() throws Exception + { + connection.start(); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = createDestination(session, + destinationType); + + MessageConsumer consumer = session.createConsumer(destination); + connection.setStatsEnabled(true); + + Session sendSession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(destination); + producer.setTimeToLive(1000); + final int count = 4; + for (int i = 0; i < count; i++) + { + TextMessage message = sendSession.createTextMessage("" + i); + producer.send(message); + } + + // let first bunch in queue expire + Thread.sleep(2000); + + producer.setTimeToLive(0); + for (int i = 0; i < count; i++) + { + TextMessage message = sendSession.createTextMessage("no expiry" + i); + producer.send(message); + } + + ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer; + + for (int i = 0; i < count; i++) + { + TextMessage msg = (TextMessage) amqConsumer.receive(); + assertNotNull(msg); + assertTrue("message has \"no expiry\" text: " + msg.getText(), msg + .getText().contains("no expiry")); + + // force an ack when there are expired messages + amqConsumer.acknowledge(); + } + assertEquals("consumer has expiredMessages", count, amqConsumer + .getConsumerStats().getExpiredMessageCount().getCount()); + } + +}
