http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java new file mode 100644 index 0000000..a458ab7 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/FailoverProviderTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.failover; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.jms.provider.DefaultProviderListener; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.failover.FailoverProvider; +import org.apache.qpid.jms.provider.failover.FailoverProviderFactory; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test basic functionality of the FailoverProvider class. + */ +public class FailoverProviderTest extends AmqpTestSupport { + + @Test(timeout=60000) + public void testFailoverCreate() throws Exception { + URI brokerURI = new URI("failover:" + getBrokerAmqpConnectionURI()); + Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI); + assertNotNull(asyncProvider); + FailoverProvider provider = (FailoverProvider) asyncProvider; + assertNotNull(provider); + } + + @Test(timeout=60000) + public void testFailoverURIConfiguration() throws Exception { + URI brokerURI = new URI("failover://(" + getBrokerAmqpConnectionURI() + ")" + + "?maxReconnectDelay=1000&useExponentialBackOff=false" + + "&maxReconnectAttempts=10&startupMaxReconnectAttempts=20"); + Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI); + assertNotNull(asyncProvider); + FailoverProvider provider = (FailoverProvider) asyncProvider; + assertNotNull(provider); + + assertEquals(1000, provider.getMaxReconnectDelay()); + assertFalse(provider.isUseExponentialBackOff()); + assertEquals(10, provider.getMaxReconnectAttempts()); + assertEquals(20, provider.getStartupMaxReconnectAttempts()); + } + + @Test(timeout=60000) + public void testStartupReconnectAttempts() throws Exception { + URI brokerURI = new URI("failover://(amqp://localhost:61616)" + + "?maxReconnectDelay=100&startupMaxReconnectAttempts=5"); + Provider asyncProvider = FailoverProviderFactory.createAsync(brokerURI); + assertNotNull(asyncProvider); + FailoverProvider provider = (FailoverProvider) asyncProvider; + assertNotNull(provider); + + assertEquals(100, provider.getMaxReconnectDelay()); + assertEquals(5, provider.getStartupMaxReconnectAttempts()); + + final CountDownLatch failed = new CountDownLatch(1); + + provider.setProviderListener(new DefaultProviderListener() { + + @Override + public void onConnectionFailure(IOException ex) { + failed.countDown(); + } + }); + + provider.connect(); + + assertTrue(failed.await(2, TimeUnit.SECONDS)); + } +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java new file mode 100644 index 0000000..d615f3d --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsFailoverTest.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.failover; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Basic tests for the FailoverProvider implementation + */ +public class JmsFailoverTest extends AmqpTestSupport { + + @Override + protected boolean isPersistent() { + return true; + } + + @Test(timeout=60000) + public void testFailoverConnects() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI()); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + connection.close(); + } + + @Test(timeout=60000) + public void testFailoverConnectsWithMultipleURIs() throws Exception { + URI brokerURI = new URI("failover://(amqp://127.0.0.1:61616,amqp://localhost:5777," + + getBrokerAmqpConnectionURI() + ")?maxReconnectDelay=500"); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + connection.close(); + } + + @Test(timeout=60000, expected=JMSException.class) + public void testStartupReconnectAttempts() throws Exception { + URI brokerURI = new URI("failover://(amqp://localhost:61616)" + + "?maxReconnectDelay=1000&startupMaxReconnectAttempts=5"); + JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI); + Connection connection = factory.createConnection(); + connection.start(); + } + + @Test(timeout=60000, expected=JMSException.class) + public void testStartupReconnectAttemptsMultipleHosts() throws Exception { + URI brokerURI = new URI("failover://(amqp://localhost:61616,amqp://localhost:61617)" + + "?maxReconnectDelay=1000&startupMaxReconnectAttempts=5"); + JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI); + Connection connection = factory.createConnection(); + connection.start(); + } + + @Test(timeout=60000) + public void testStartFailureWithAsyncExceptionListener() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000&maxReconnectAttempts=5"); + + final CountDownLatch failed = new CountDownLatch(1); + JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI); + factory.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + LOG.info("Connection got exception: {}", exception.getMessage()); + failed.countDown(); + } + }); + Connection connection = factory.createConnection(); + connection.start(); + + stopPrimaryBroker(); + + assertTrue("No async exception", failed.await(15, TimeUnit.SECONDS)); + } + + @SuppressWarnings("unused") + @Test(timeout=60000) + public void testBasicStateRestoration() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000"); + + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + + restartPrimaryBroker(); + + assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getCurrentConnectionsCount() == 1; + } + })); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + + connection.close(); + } + + @SuppressWarnings("unused") + @Test(timeout=60000) + public void testDurableSubscriberRestores() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=1000"); + + Connection connection = createAmqpConnection(brokerURI); + connection.setClientID(name.getMethodName()); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(name.getMethodName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName()); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + + restartPrimaryBroker(); + + assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getCurrentConnectionsCount() == 1; + } + })); + + assertTrue("Should have no inactive subscribers.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 0; + } + })); + + assertTrue("Should have one durable sub.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 1; + } + })); + + connection.close(); + } + + @Test(timeout=90000) + public void testBadFirstURIConnectsAndProducerWorks() throws Exception { + URI brokerURI = new URI("failover://(amqp://localhost:61616," + + getBrokerAmqpConnectionURI() + ")?maxReconnectDelay=1000"); + + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + final int MSG_COUNT = 10; + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + final MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + final CountDownLatch failed = new CountDownLatch(1); + + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(session.createTextMessage("Message: " + i)); + } + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + + assertTrue("Should have all messages sent.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == MSG_COUNT; + } + })); + + assertFalse(failed.getCount() == 0); + connection.close(); + } + + // TODO - FIXME + @Ignore("Test currently not working") + @Test(timeout=90000) + public void testProducerBlocksAndRecovers() throws Exception { + URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=1000"); + + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + final int MSG_COUNT = 10; + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + final MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + final CountDownLatch failed = new CountDownLatch(1); + + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + + Thread producerThread = new Thread(new Runnable() { + + @Override + public void run() { + try { + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(session.createTextMessage("Message: " + i)); + TimeUnit.SECONDS.sleep(1); + } + } catch (Exception e) { + } + } + }); + producerThread.start(); + + TimeUnit.SECONDS.sleep(3); + stopPrimaryBroker(); + TimeUnit.SECONDS.sleep(3); + restartPrimaryBroker(); + + assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getCurrentConnectionsCount() == 1; + } + })); + + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + + assertTrue("Should have all messages sent.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == MSG_COUNT; + } + })); + + assertFalse(failed.getCount() == 0); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java new file mode 100644 index 0000000..bc971f6 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsOfflineBehaviorTests.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.failover; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Test; + +/** + * Test various client behaviors when the connection has gone offline. + */ +public class JmsOfflineBehaviorTests extends AmqpTestSupport { + + @Test(timeout=60000) + public void testConnectionCloseDoesNotBlock() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI()); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + stopPrimaryBroker(); + connection.close(); + } + + @Test(timeout=60000) + public void testSessionCloseDoesNotBlock() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI()); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + stopPrimaryBroker(); + session.close(); + connection.close(); + } + + @Test(timeout=60000) + public void testClientAckDoesNotBlock() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI()); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Test")); + + Message message = consumer.receive(5000); + assertNotNull(message); + stopPrimaryBroker(); + message.acknowledge(); + + connection.close(); + } + + @Test(timeout=60000) + public void testProducerCloseDoesNotBlock() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI()); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + + stopPrimaryBroker(); + producer.close(); + connection.close(); + } + + @Test(timeout=60000) + public void testConsumerCloseDoesNotBlock() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI()); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + stopPrimaryBroker(); + consumer.close(); + connection.close(); + } + + @SuppressWarnings("unused") + @Test(timeout=60000) + public void testSessionCloseWithOpenResourcesDoesNotBlock() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI()); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + stopPrimaryBroker(); + session.close(); + connection.close(); + } + + @Test(timeout=60000) + public void testGetRemoteURI() throws Exception { + + startNewBroker(); + + URI brokerURI = new URI(getAmqpFailoverURI() + "randomize=false"); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + JmsConnection jmsConnection = (JmsConnection) connection; + final Provider provider = jmsConnection.getProvider(); + + URI connectedURI = provider.getRemoteURI(); + assertNotNull(connectedURI); + + final List<URI> brokers = getBrokerURIs(); + assertEquals(brokers.get(0), connectedURI); + + stopPrimaryBroker(); + + assertTrue("Should connect to secondary URI.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return provider.getRemoteURI().equals(brokers.get(1)); + } + })); + + connection.close(); + } + + @SuppressWarnings("unused") + @Test(timeout=60000) + public void testClosedReourcesAreNotRestored() throws Exception { + URI brokerURI = new URI(getAmqpFailoverURI() + "?maxReconnectDelay=500"); + Connection connection = createAmqpConnection(brokerURI); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + assertEquals(1, brokerService.getAdminView().getQueueSubscribers().length); + assertEquals(1, brokerService.getAdminView().getQueueProducers().length); + + stopPrimaryBroker(); + session.close(); + TimeUnit.SECONDS.sleep(2); + restartPrimaryBroker(); + + assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getCurrentConnectionsCount() == 1; + } + })); + + assertEquals(0, brokerService.getAdminView().getQueueSubscribers().length); + assertEquals(0, brokerService.getAdminView().getQueueProducers().length); + + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java new file mode 100644 index 0000000..f1e7db1 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/ActiveMQAdmin.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.joram; + +import java.io.File; +import java.net.URI; +import java.util.Hashtable; + +import javax.jms.ConnectionFactory; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsQueue; +import org.apache.qpid.jms.JmsTopic; +import org.objectweb.jtests.jms.admin.Admin; + +/** + * + */ +public class ActiveMQAdmin implements Admin { + + Context context; + { + try { + // Use the jetty JNDI context since it's mutable. + final Hashtable<String, String> env = new Hashtable<String, String>(); + env.put("java.naming.factory.initial", "org.eclipse.jetty.jndi.InitialContextFactory"); + env.put("java.naming.factory.url.pkgs", "org.eclipse.jetty.jndi"); + ; + context = new InitialContext(env); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); + } + + @Override + public String getName() { + return getClass().getName(); + } + + static BrokerService broker; + static int port; + + @Override + public void startServer() throws Exception { + if (broker != null) { + stopServer(); + } + if (System.getProperty("basedir") == null) { + File file = new File("."); + System.setProperty("basedir", file.getAbsolutePath()); + } + broker = createBroker(); + TransportConnector connector = broker.addConnector(getConnectorURI()); + broker.start(); + port = connector.getConnectUri().getPort(); + } + + protected String getConnectorURI() { + return "amqp://localhost:0"; + } + + @Override + public void stopServer() throws Exception { + broker.stop(); + broker = null; + } + + @Override + public void start() throws Exception { + } + + @Override + public void stop() throws Exception { + } + + @Override + public Context createContext() throws NamingException { + return context; + } + + @Override + public void createQueue(String name) { + try { + context.bind(name, new JmsQueue(name)); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void createTopic(String name) { + try { + context.bind(name, new JmsTopic(name)); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void deleteQueue(String name) { + try { + context.unbind(name); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void deleteTopic(String name) { + try { + context.unbind(name); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void createConnectionFactory(String name) { + try { + final ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + port); + context.bind(name, factory); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void deleteConnectionFactory(String name) { + try { + context.unbind(name); + } catch (NamingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void createQueueConnectionFactory(String name) { + createConnectionFactory(name); + } + + @Override + public void createTopicConnectionFactory(String name) { + createConnectionFactory(name); + } + + @Override + public void deleteQueueConnectionFactory(String name) { + deleteConnectionFactory(name); + } + + @Override + public void deleteTopicConnectionFactory(String name) { + deleteConnectionFactory(name); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java new file mode 100644 index 0000000..3e59b69 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/joram/JoramJmsTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.joram; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +import org.objectweb.jtests.jms.conform.connection.ConnectionTest; +import org.objectweb.jtests.jms.conform.connection.TopicConnectionTest; +import org.objectweb.jtests.jms.conform.message.MessageBodyTest; +import org.objectweb.jtests.jms.conform.message.MessageDefaultTest; +import org.objectweb.jtests.jms.conform.message.headers.MessageHeaderTest; +import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyConversionTest; +import org.objectweb.jtests.jms.conform.message.properties.MessagePropertyTest; +import org.objectweb.jtests.jms.conform.queue.TemporaryQueueTest; +import org.objectweb.jtests.jms.conform.selector.SelectorSyntaxTest; +import org.objectweb.jtests.jms.conform.selector.SelectorTest; +import org.objectweb.jtests.jms.conform.session.QueueSessionTest; +import org.objectweb.jtests.jms.conform.topic.TemporaryTopicTest; + +public class JoramJmsTest extends TestCase { + + public static Test suite() { + TestSuite suite = new TestSuite(); + + // TODO: Fix these tests.. + // Fails due to + // https://issues.apache.org/jira/browse/PROTON-154 + // suite.addTestSuite(TopicSessionTest.class); + +// suite.addTestSuite(MessageTypeTest.class); +// suite.addTestSuite(UnifiedSessionTest.class); +// suite.addTestSuite(JMSXPropertyTest.class); +// suite.addTestSuite(SessionTest.class); + +// suite.addTestSuite(QueueBrowserTest.class); + suite.addTestSuite(QueueSessionTest.class); + suite.addTestSuite(SelectorSyntaxTest.class); + suite.addTestSuite(SelectorTest.class); + suite.addTestSuite(MessageHeaderTest.class); + suite.addTestSuite(TemporaryTopicTest.class); + suite.addTestSuite(TemporaryQueueTest.class); + suite.addTestSuite(TopicConnectionTest.class); + suite.addTestSuite(ConnectionTest.class); + suite.addTestSuite(MessageBodyTest.class); + suite.addTestSuite(MessageDefaultTest.class); + suite.addTestSuite(MessagePropertyConversionTest.class); + suite.addTestSuite(MessagePropertyTest.class); + + return suite; + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java new file mode 100644 index 0000000..04b954a --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsAnonymousProducerTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.producer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test JMS Anonymous Producer functionality. + */ +public class JmsAnonymousProducerTest extends AmqpTestSupport { + + @Test(timeout = 60000) + public void testCreateProducer() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + session.createProducer(null); + + assertTrue(brokerService.getAdminView().getTotalProducerCount() == 0); + } + + @Test(timeout = 60000) + public void testAnonymousSend() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + assertNotNull(session); + MessageProducer producer = session.createProducer(null); + + Message message = session.createMessage(); + producer.send(queue, message); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + } + + @Test(timeout = 60000) + public void testAnonymousSendToMultipleDestinations() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue1 = session.createQueue(name.getMethodName() + 1); + Queue queue2 = session.createQueue(name.getMethodName() + 2); + Queue queue3 = session.createQueue(name.getMethodName() + 3); + assertNotNull(session); + MessageProducer producer = session.createProducer(null); + + Message message = session.createMessage(); + producer.send(queue1, message); + producer.send(queue2, message); + producer.send(queue3, message); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName() + 1); + assertEquals(1, proxy.getQueueSize()); + proxy = getProxyToQueue(name.getMethodName() + 2); + assertEquals(1, proxy.getQueueSize()); + proxy = getProxyToQueue(name.getMethodName() + 3); + assertEquals(1, proxy.getQueueSize()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java new file mode 100644 index 0000000..a7b8cdf --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerClosedTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.producer; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test the contract of MessageProducer that has been closed. + */ +public class JmsMessageProducerClosedTest extends AmqpTestSupport { + + protected MessageProducer producer; + protected Message message; + protected Destination destination; + + protected MessageProducer createProducer() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + message = session.createMessage(); + destination = session.createTopic("test"); + MessageProducer producer = session.createProducer(destination); + producer.close(); + return producer; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + producer = createProducer(); + } + + @Test(timeout=30000) + public void testClose() throws JMSException { + producer.close(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSetDisableMessageIDFails() throws JMSException { + producer.setDisableMessageID(true); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetDisableMessageIDFails() throws JMSException { + producer.getDisableMessageID(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSetDisableMessageTimestampFails() throws JMSException { + producer.setDisableMessageTimestamp(false); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetDisableMessageTimestampFails() throws JMSException { + producer.getDisableMessageTimestamp(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSetDeliveryModeFails() throws JMSException { + producer.setDeliveryMode(1); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetDeliveryModeFails() throws JMSException { + producer.getDeliveryMode(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSetPriorityFails() throws JMSException { + producer.setPriority(1); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetPriorityFails() throws JMSException { + producer.getPriority(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSetTimeToLiveFails() throws JMSException { + producer.setTimeToLive(1); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetTimeToLiveFails() throws JMSException { + producer.getTimeToLive(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetDestinationFails() throws JMSException { + producer.getDestination(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSendFails() throws JMSException { + producer.send(message); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSendWithDestinationFails() throws JMSException { + producer.send(destination, message); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSendWithModePriorityTTLFails() throws JMSException { + producer.send(message, 1, 3, 111); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSendWithDestinationModePriorityTTLFails() throws JMSException { + producer.send(destination, message, 1, 3, 111); + } +} + http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java new file mode 100644 index 0000000..b6f5011 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerFailedTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.producer; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.support.Wait; + +/** + * Tests the MessageProducer method contract when it's connection has failed. + */ +public class JmsMessageProducerFailedTest extends JmsMessageProducerClosedTest { + + @Override + protected MessageProducer createProducer() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + message = session.createMessage(); + destination = session.createQueue("test"); + MessageProducer producer = session.createProducer(destination); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + latch.countDown(); + } + }); + connection.start(); + stopPrimaryBroker(); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + final JmsConnection jmsConnection = (JmsConnection) connection; + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !jmsConnection.isConnected(); + } + })); + return producer; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java new file mode 100644 index 0000000..ceb2d53 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.producer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.DeliveryMode; +import javax.jms.JMSSecurityException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * + */ +public class JmsMessageProducerTest extends AmqpTestSupport { + + @Test(timeout = 60000) + public void testCreateMessageProducer() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + session.createProducer(queue); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + } + + @Test + public void testSendWorksWhenConnectionNotStarted() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + + Message message = session.createMessage(); + producer.send(message); + + assertEquals(1, proxy.getQueueSize()); + } + + @Test + public void testSendWorksAfterConnectionStopped() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + connection.stop(); + + Message message = session.createMessage(); + producer.send(message); + + assertEquals(1, proxy.getQueueSize()); + } + + @Test + public void testPersistentSendsAreMarkedPersistent() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + + Message message = session.createMessage(); + producer.send(message); + + assertEquals(1, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + message = consumer.receive(5000); + assertNotNull(message); + assertTrue(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT); + } + + @Test + public void testProducerWithNoTTLSendsMessagesWithoutTTL() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + + Message message = session.createMessage(); + producer.send(message); + + assertEquals(1, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + message = consumer.receive(5000); + assertNotNull(message); + assertEquals(0, message.getJMSExpiration()); + } + + private String createLargeString(int sizeInBytes) { + byte[] base = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 }; + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < sizeInBytes; i++) { + builder.append(base[i % base.length]); + } + + LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes"); + return builder.toString(); + } + + @Test(timeout = 60 * 1000) + public void testSendLargeMessage() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = name.toString(); + Queue queue = session.createQueue(queueName); + + MessageProducer producer = session.createProducer(queue); + int messageSize = 1024 * 1024; + String messageText = createLargeString(messageSize); + Message m = session.createTextMessage(messageText); + LOG.debug("Sending message of {} bytes on queue {}", messageSize, queueName); + producer.send(m); + + MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(); + assertNotNull(message); + assertTrue(message instanceof TextMessage); + TextMessage textMessage = (TextMessage) message; + LOG.debug(">>>> Received message of length {}", textMessage.getText().length()); + assertEquals(messageSize, textMessage.getText().length()); + assertEquals(messageText, textMessage.getText()); + } + + @Test(timeout=90000, expected=JMSSecurityException.class) + public void testProducerNotAuthorized() throws Exception{ + connection = createAmqpConnection("guest", "password"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("USERS." + name.getMethodName()); + session.createProducer(queue); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java new file mode 100644 index 0000000..5ba9f9e --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsProduceMessageTypesTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.producer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.BytesMessage; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test basic MessageProducer functionality. + */ +public class JmsProduceMessageTypesTest extends AmqpTestSupport { + + @Test(timeout = 60000) + public void testSendJMSMessage() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + Message message = session.createMessage(); + producer.send(message); + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + } + + @Test(timeout = 60000) + public void testSendJMSBytesMessage() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + String payload = "TEST"; + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + BytesMessage message = session.createBytesMessage(); + message.writeUTF(payload); + producer.send(message); + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + Message received = consumer.receive(5000); + assertNotNull(received); + assertTrue(received instanceof BytesMessage); + BytesMessage bytes = (BytesMessage) received; + assertEquals(payload, bytes.readUTF()); + } + + @Test(timeout = 60000) + public void testSendJMSMapMessage() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + MapMessage message = session.createMapMessage(); + message.setBoolean("Boolean", false); + message.setString("STRING", "TEST"); + producer.send(message); + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + Message received = consumer.receive(5000); + assertNotNull(received); + assertTrue(received instanceof MapMessage); + MapMessage map = (MapMessage) received; + assertEquals("TEST", map.getString("STRING")); + assertEquals(false, map.getBooleanProperty("Boolean")); + } + + @Test(timeout = 60000) + public void testSendJMSStreamMessage() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + String payload = "TEST"; + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + StreamMessage message = session.createStreamMessage(); + message.writeString(payload); + producer.send(message); + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + Message received = consumer.receive(5000); + assertNotNull(received); + assertTrue(received instanceof StreamMessage); + StreamMessage stream = (StreamMessage) received; + assertEquals(payload, stream.readString()); + } + + @Test(timeout = 60000) + public void testSendJMSTextMessage() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + String payload = "TEST"; + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage("TEST"); + producer.send(message); + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue); + Message received = consumer.receive(5000); + assertNotNull(received); + assertTrue(received instanceof TextMessage); + TextMessage text = (TextMessage) received; + assertEquals(payload, text.getText()); + } + + @Test(timeout = 60000) + public void testSendJMSObjectMessage() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + ObjectMessage message = session.createObjectMessage("TEST"); + producer.send(message); + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java new file mode 100644 index 0000000..e4c0910 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsQueueSenderTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.producer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test basic QueueSender functionality. + */ +public class JmsQueueSenderTest extends AmqpTestSupport { + + @Test + public void testCreateQueueSender() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + QueueConnection connection = factory.createQueueConnection(); + assertNotNull(connection); + + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + QueueSender sender = session.createSender(queue); + assertNotNull(sender); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java new file mode 100644 index 0000000..80c4215 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/producer/JmsTopicPublisherTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.producer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; + +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * test basic TopicPublisher functionality. + */ +public class JmsTopicPublisherTest extends AmqpTestSupport { + + @Test + public void testCreateTopicPublisher() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI()); + TopicConnection connection = factory.createTopicConnection(); + assertNotNull(connection); + + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Topic topic = session.createTopic(name.getMethodName()); + TopicPublisher publisher = session.createPublisher(topic); + assertNotNull(publisher); + + TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); + assertEquals(0, proxy.getEnqueueCount()); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java new file mode 100644 index 0000000..0e1d247 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.support; + +import java.net.URI; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AmqpTestSupport extends QpidJmsTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(AmqpTestSupport.class); + + protected boolean isAmqpDiscovery() { + return false; + } + + protected String getAmqpTransformer() { + return "jms"; + } + + protected int getSocketBufferSize() { + return 64 * 1024; + } + + protected int getIOBufferSize() { + return 8 * 1024; + } + + @Override + protected void addAdditionalConnectors(BrokerService brokerService, Map<String, Integer> portMap) throws Exception { + int port = 0; + if (portMap.containsKey("amqp")) { + port = portMap.get("amqp"); + } + TransportConnector connector = brokerService.addConnector( + "amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer() + + "&transport.socketBufferSize=" + getSocketBufferSize() + "&ioBufferSize=" + getIOBufferSize()); + connector.setName("amqp"); + if (isAmqpDiscovery()) { + connector.setDiscoveryUri(new URI("multicast://default")); + } + port = connector.getPublishableConnectURI().getPort(); + LOG.debug("Using amqp port: {}", port); + } + + public String getAmqpConnectionURIOptions() { + return ""; + } + + public URI getBrokerAmqpConnectionURI() { + try { + String uri = "amqp://127.0.0.1:" + + brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI().getPort(); + + if (!getAmqpConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getAmqpConnectionURIOptions(); + } + + return new URI(uri); + } catch (Exception e) { + throw new RuntimeException(); + } + } + + public String getAmqpFailoverURI() throws Exception { + StringBuilder uri = new StringBuilder(); + uri.append("failover://("); + uri.append(brokerService.getTransportConnectorByName("amqp").getPublishableConnectString()); + + for (BrokerService broker : brokers) { + uri.append(","); + uri.append(broker.getTransportConnectorByName("amqp").getPublishableConnectString()); + } + + uri.append(")"); + + return uri.toString(); + } + + public Connection createAmqpConnection() throws Exception { + return createAmqpConnection(getBrokerAmqpConnectionURI()); + } + + public Connection createAmqpConnection(String username, String password) throws Exception { + return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password); + } + + public Connection createAmqpConnection(URI brokerURI) throws Exception { + return createAmqpConnection(brokerURI, null, null); + } + + public Connection createAmqpConnection(URI brokerURI, String username, String password) throws Exception { + ConnectionFactory factory = createAmqpConnectionFactory(brokerURI, username, password); + return factory.createConnection(); + } + + public ConnectionFactory createAmqpConnectionFactory() throws Exception { + return createAmqpConnectionFactory(getBrokerAmqpConnectionURI(), null, null); + } + + public ConnectionFactory createAmqpConnectionFactory(URI brokerURI) throws Exception { + return createAmqpConnectionFactory(brokerURI, null, null); + } + + public ConnectionFactory createAmqpConnectionFactory(String username, String password) throws Exception { + return createAmqpConnectionFactory(getBrokerAmqpConnectionURI(), username, password); + } + + public ConnectionFactory createAmqpConnectionFactory(URI brokerURI, String username, String password) throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory(brokerURI); + factory.setForceAsyncSend(isForceAsyncSends()); + factory.setAlwaysSyncSend(isAlwaysSyncSend()); + factory.setMessagePrioritySupported(isMessagePrioritySupported()); + factory.setSendAcksAsync(isSendAcksAsync()); + if (username != null) { + factory.setUsername(username); + } + if (password != null) { + factory.setPassword(password); + } + return factory; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
