Repository: qpid-jms Updated Branches: refs/heads/master 99f60c210 -> e4decdc1e
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java new file mode 100644 index 0000000..abb1bb4 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/QpidJmsTestSupport.java @@ -0,0 +1,447 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.support; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Vector; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.ConnectorViewMBean; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.AuthorizationEntry; +import org.apache.activemq.security.AuthorizationPlugin; +import org.apache.activemq.security.DefaultAuthorizationMap; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.apache.activemq.security.TempDestinationAuthorizationEntry; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.JMXSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for all protocol test support classes. + */ +public class QpidJmsTestSupport { + + public static final String KAHADB_DIRECTORY = "target/activemq-data"; + + @Rule public TestName name = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(QpidJmsTestSupport.class); + protected BrokerService brokerService; + protected final List<BrokerService> brokers = new ArrayList<BrokerService>(); + protected final Vector<Throwable> exceptions = new Vector<Throwable>(); + protected int numberOfMessages; + protected Connection connection; + + @Before + public void setUp() throws Exception { + exceptions.clear(); + startPrimaryBroker(); + this.numberOfMessages = 2000; + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + stopPrimaryBroker(); + for (BrokerService broker : brokers) { + try { + stopBroker(broker); + } catch (Exception ex) {} + } + } + + public String getDestinationName() { + return name.getMethodName(); + } + + public URI getBrokerActiveMQClientConnectionURI() { + try { + return new URI("tcp://127.0.0.1:" + + brokerService.getTransportConnectorByName("openwire").getPublishableConnectURI().getPort()); + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected boolean isPersistent() { + return false; + } + + protected boolean isAdvisorySupport() { + return false; + } + + protected BrokerService createBroker(String name, boolean deleteAllMessages) throws Exception { + return createBroker(name, deleteAllMessages, Collections.<String, Integer> emptyMap()); + } + + protected void configureBrokerPolicies(BrokerService broker) { + + } + + protected BrokerService createBroker(String name, boolean deleteAllMessages, Map<String, Integer> portMap) throws Exception { + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File(KAHADB_DIRECTORY + "/" + name)); + + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName(name); + brokerService.setPersistent(isPersistent()); + brokerService.setAdvisorySupport(isAdvisorySupport()); + brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); + brokerService.setUseJmx(true); + brokerService.setDataDirectory("target/" + name); + brokerService.setPersistenceAdapter(kaha); + brokerService.setStoreOpenWireVersion(10); + + configureBrokerPolicies(brokerService); + + ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>(); + BrokerPlugin authenticationPlugin = configureAuthentication(); + if (authenticationPlugin != null) { + plugins.add(configureAuthorization()); + } + + BrokerPlugin authorizationPlugin = configureAuthorization(); + if (authorizationPlugin != null) { + plugins.add(configureAuthentication()); + } + + if (!plugins.isEmpty()) { + BrokerPlugin[] array = new BrokerPlugin[plugins.size()]; + brokerService.setPlugins(plugins.toArray(array)); + } + + addOpenWireConnector(brokerService, portMap); + addAdditionalConnectors(brokerService, portMap); + + return brokerService; + } + + protected int addOpenWireConnector(BrokerService brokerService, Map<String, Integer> portMap) throws Exception { + int port = 0; + if (portMap.containsKey("openwire")) { + port = portMap.get("openwire"); + } + TransportConnector connector = brokerService.addConnector("tcp://0.0.0.0:" + port + "?trace=true"); + connector.setName("openwire"); + int openwirePort = connector.getPublishableConnectURI().getPort(); + LOG.debug("Using openwire port: {}", openwirePort); + return openwirePort; + } + + protected void addAdditionalConnectors(BrokerService brokerService, Map<String, Integer> portMap) throws Exception { + // Subclasses can add their own connectors, we don't add any here. + } + + public void startPrimaryBroker() throws Exception { + if (brokerService != null && brokerService.isStarted()) { + throw new IllegalStateException("Broker is already created."); + } + + brokerService = createBroker("localhost", true); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + public void restartPrimaryBroker() throws Exception { + stopBroker(brokerService); + brokerService = restartBroker(brokerService); + } + + public void stopPrimaryBroker() throws Exception { + stopBroker(brokerService); + } + + public void startNewBroker() throws Exception { + String brokerName = "localhost" + (brokers.size() + 1); + + BrokerService brokerService = createBroker(brokerName, true); + brokerService.setUseJmx(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + brokers.add(brokerService); + } + + public BrokerService restartBroker(BrokerService brokerService) throws Exception { + String name = brokerService.getBrokerName(); + Map<String, Integer> portMap = new HashMap<String, Integer>(); + for (TransportConnector connector : brokerService.getTransportConnectors()) { + portMap.put(connector.getName(), connector.getPublishableConnectURI().getPort()); + } + + stopBroker(brokerService); + BrokerService broker = createBroker(name, false, portMap); + broker.start(); + broker.waitUntilStarted(); + return broker; + } + + public void stopBroker(BrokerService broker) throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + public List<URI> getBrokerURIs() throws Exception { + ArrayList<URI> result = new ArrayList<URI>(); + result.add(brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI()); + + for (BrokerService broker : brokers) { + result.add(broker.getTransportConnectorByName("amqp").getPublishableConnectURI()); + } + + return result; + } + + public Connection createActiveMQConnection() throws Exception { + return createActiveMQConnection(getBrokerActiveMQClientConnectionURI()); + } + + public Connection createActiveMQConnection(URI brokerURI) throws Exception { + ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI); + return factory.createConnection(); + } + + public void sendMessages(Connection connection, Destination destination, int count) throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = session.createProducer(destination); + + for (int i = 0; i < count; i++) { + TextMessage message = session.createTextMessage(); + message.setText("TextMessage: " + i); + p.send(message); + } + + session.close(); + } + + protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException { + return getProxyToBroker(brokerService); + } + + protected BrokerViewMBean getProxyToBroker(BrokerService broker) throws MalformedObjectNameException, JMSException { + ObjectName brokerViewMBean = broker.getBrokerObjectName(); + BrokerViewMBean proxy = (BrokerViewMBean) brokerService.getManagementContext() + .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true); + return proxy; + } + + protected ConnectorViewMBean getProxyToConnectionView(String connectionType) throws Exception { + return getProxyToConnectionView(connectionType); + } + + protected ConnectorViewMBean getProxyToConnectionView(BrokerService broker, String connectionType) throws Exception { + ObjectName connectorQuery = new ObjectName( + broker.getBrokerObjectName() + ",connector=clientConnectors,connectorName="+connectionType+"_//*"); + + Set<ObjectName> results = brokerService.getManagementContext().queryNames(connectorQuery, null); + if (results == null || results.isEmpty() || results.size() > 1) { + throw new Exception("Unable to find the exact Connector instance."); + } + + ConnectorViewMBean proxy = (ConnectorViewMBean) brokerService.getManagementContext() + .newProxyInstance(results.iterator().next(), ConnectorViewMBean.class, true); + return proxy; + } + + protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { + return getProxyToQueue(brokerService, name); + } + + protected QueueViewMBean getProxyToQueue(BrokerService broker, String name) throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName( + broker.getBrokerObjectName() + ",destinationType=Queue,destinationName=" + name); + QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + + protected QueueViewMBean getProxyToTemporaryQueue(String name) throws MalformedObjectNameException, JMSException { + return getProxyToTemporaryQueue(brokerService, name); + } + + protected QueueViewMBean getProxyToTemporaryQueue(BrokerService broker, String name) throws MalformedObjectNameException, JMSException { + name = JMXSupport.encodeObjectNamePart(name); + ObjectName queueViewMBeanName = new ObjectName( + broker.getBrokerObjectName() + ",destinationType=TempQueue,destinationName=" + name); + QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + + protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { + return getProxyToTopic(brokerService, name); + } + + protected TopicViewMBean getProxyToTopic(BrokerService broker, String name) throws MalformedObjectNameException, JMSException { + ObjectName topicViewMBeanName = new ObjectName( + broker.getBrokerObjectName() + ",destinationType=Topic,destinationName=" + name); + TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext() + .newProxyInstance(topicViewMBeanName, TopicViewMBean.class, true); + return proxy; + } + + protected TopicViewMBean getProxyToTemporaryTopic(String name) throws MalformedObjectNameException, JMSException { + return getProxyToTemporaryTopic(brokerService, name); + } + + protected TopicViewMBean getProxyToTemporaryTopic(BrokerService broker, String name) throws MalformedObjectNameException, JMSException { + name = JMXSupport.encodeObjectNamePart(name); + ObjectName topicViewMBeanName = new ObjectName( + broker.getBrokerObjectName() + ",destinationType=TempTopic,destinationName=" + name); + TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext() + .newProxyInstance(topicViewMBeanName, TopicViewMBean.class, true); + return proxy; + } + + protected void sendToAmqQueue(int count) throws Exception { + Connection activemqConnection = createActiveMQConnection(); + Session amqSession = activemqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue amqTestQueue = amqSession.createQueue(name.getMethodName()); + sendMessages(activemqConnection, amqTestQueue, count); + } + + protected void sendToAmqTopic(int count) throws Exception { + Connection activemqConnection = createActiveMQConnection(); + Session amqSession = activemqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic amqTestTopic = amqSession.createTopic(name.getMethodName()); + sendMessages(activemqConnection, amqTestTopic, count); + } + + protected BrokerPlugin configureAuthentication() throws Exception { + List<AuthenticationUser> users = new ArrayList<AuthenticationUser>(); + users.add(new AuthenticationUser("system", "manager", "users,admins")); + users.add(new AuthenticationUser("user", "password", "users")); + users.add(new AuthenticationUser("guest", "password", "guests")); + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); + authenticationPlugin.setAnonymousAccessAllowed(true); + + return authenticationPlugin; + } + + protected BrokerPlugin configureAuthorization() throws Exception { + + @SuppressWarnings("rawtypes") + List<DestinationMapEntry> authorizationEntries = new ArrayList<DestinationMapEntry>(); + + AuthorizationEntry entry = new AuthorizationEntry(); + entry.setQueue(">"); + entry.setRead("admins,anonymous"); + entry.setWrite("admins,anonymous"); + entry.setAdmin("admins,anonymous"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setQueue("USERS.>"); + entry.setRead("users,anonymous"); + entry.setWrite("users,anonymous"); + entry.setAdmin("users,anonymous"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setQueue("GUEST.>"); + entry.setRead("guests,anonymous"); + entry.setWrite("guests,users,anonymous"); + entry.setAdmin("guests,users,anonymous"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic(">"); + entry.setRead("admins,anonymous"); + entry.setWrite("admins,anonymous"); + entry.setAdmin("admins,anonymous"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("USERS.>"); + entry.setRead("users,anonymous"); + entry.setWrite("users,anonymous"); + entry.setAdmin("users,anonymous"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("GUEST.>"); + entry.setRead("guests,anonymous"); + entry.setWrite("guests,users,anonymous"); + entry.setAdmin("guests,users,anonymous"); + authorizationEntries.add(entry); + entry = new AuthorizationEntry(); + entry.setTopic("ActiveMQ.Advisory.>"); + entry.setRead("guests,users,anonymous"); + entry.setWrite("guests,users,anonymous"); + entry.setAdmin("guests,users,anonymous"); + authorizationEntries.add(entry); + + TempDestinationAuthorizationEntry tempEntry = new TempDestinationAuthorizationEntry(); + tempEntry.setRead("admins,anonymous"); + tempEntry.setWrite("admins,anonymous"); + tempEntry.setAdmin("admins,anonymous"); + + DefaultAuthorizationMap authorizationMap = new DefaultAuthorizationMap(authorizationEntries); + authorizationMap.setTempDestinationAuthorizationEntry(tempEntry); + AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(authorizationMap); + + return authorizationPlugin; + } + + protected boolean isForceAsyncSends() { + return false; + } + + protected boolean isAlwaysSyncSend() { + return false; + } + + protected boolean isMessagePrioritySupported() { + return true; + } + + protected boolean isSendAcksAsync() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/Wait.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/Wait.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/Wait.java new file mode 100644 index 0000000..10f511e --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/Wait.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.support; + +import java.util.concurrent.TimeUnit; + +public class Wait { + + public static final long MAX_WAIT_MILLIS = 30 * 1000; + public static final int SLEEP_MILLIS = 1000; + + public interface Condition { + boolean isSatisified() throws Exception; + } + + public static boolean waitFor(Condition condition) throws Exception { + return waitFor(condition, MAX_WAIT_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long duration) throws Exception { + return waitFor(condition, duration, SLEEP_MILLIS); + } + + public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception { + + final long expiry = System.currentTimeMillis() + duration; + boolean conditionSatisified = condition.isSatisified(); + while (!conditionSatisified && System.currentTimeMillis() < expiry) { + TimeUnit.MILLISECONDS.sleep(sleepMillis); + conditionSatisified = condition.isSatisified(); + } + return conditionSatisified; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java new file mode 100644 index 0000000..49121ad --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test consumer behavior for Transacted Session Consumers. + */ +public class JmsTransactedConsumerTest extends AmqpTestSupport { + + @Test(timeout = 60000) + public void testCreateConsumerFromTxSession() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + assertTrue(session.getTransacted()); + + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + assertNotNull(consumer); + } + + @Test(timeout = 60000) + public void testConsumedInTxAreAcked() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + sendToAmqQueue(1); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(5000); + assertNotNull(message); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + session.commit(); + + assertEquals(0, proxy.getQueueSize()); + } + + @Test(timeout = 60000) + public void testReceiveAndRollback() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + sendToAmqQueue(2); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(2, proxy.getQueueSize()); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(1000); + assertNotNull(message); + session.commit(); + + assertEquals(1, proxy.getQueueSize()); + + // rollback so we can get that last message again. + message = consumer.receive(1000); + assertNotNull(message); + session.rollback(); + + assertEquals(1, proxy.getQueueSize()); + + // Consume again.. the prev message should get redelivered. + message = consumer.receive(5000); + assertNotNull("Should have re-received the message again!", message); + session.commit(); + + assertEquals(0, proxy.getQueueSize()); + } + + @Test(timeout = 60000) + public void testReceiveTwoThenRollback() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + sendToAmqQueue(2); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(2, proxy.getQueueSize()); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + Message message = consumer.receive(1000); + assertNotNull(message); + message = consumer.receive(1000); + assertNotNull(message); + session.rollback(); + + assertEquals(2, proxy.getQueueSize()); + + // Consume again.. the prev message should get redelivered. + message = consumer.receive(5000); + assertNotNull("Should have re-received the message again!", message); + message = consumer.receive(5000); + assertNotNull("Should have re-received the message again!", message); + session.commit(); + + assertEquals(0, proxy.getQueueSize()); + } + + @Test(timeout = 60000) + public void testCloseConsumerBeforeCommit() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + sendToAmqQueue(2); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(2, proxy.getQueueSize()); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + TextMessage message = (TextMessage) consumer.receive(5000); + assertNotNull(message); + consumer.close(); + + assertEquals(2, proxy.getQueueSize()); + session.commit(); + assertEquals(1, proxy.getQueueSize()); + + // Create a new consumer + consumer = session.createConsumer(queue); + message = (TextMessage) consumer.receive(1000); + session.commit(); + + assertEquals(0, proxy.getQueueSize()); + } + + @Test(timeout=60000) + public void testJMSXDeliveryCount() throws Exception { + sendToAmqQueue(1); + + connection = createAmqpConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertEquals(true, session.getTransacted()); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + // we receive a message...it should be delivered once and not be Re-delivered. + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals(false, message.getJMSRedelivered()); + int jmsxDeliveryCount = message.getIntProperty("JMSXDeliveryCount"); + LOG.info("Incoming message has delivery count: {}", jmsxDeliveryCount); + assertEquals(1, jmsxDeliveryCount); + session.rollback(); + + // we receive again a message + message = consumer.receive(5000); + assertNotNull(message); + assertEquals(true, message.getJMSRedelivered()); + jmsxDeliveryCount = message.getIntProperty("JMSXDeliveryCount"); + LOG.info("Redelivered message has delivery count: {}", jmsxDeliveryCount); + assertEquals(2, jmsxDeliveryCount); + session.rollback(); + + // we receive again a message + message = consumer.receive(5000); + assertNotNull(message); + assertEquals(true, message.getJMSRedelivered()); + jmsxDeliveryCount = message.getIntProperty("JMSXDeliveryCount"); + LOG.info("Redelivered message has delivery count: {}", jmsxDeliveryCount); + assertEquals(3, jmsxDeliveryCount); + session.commit(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java new file mode 100644 index 0000000..e7f0361 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedProducerTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Test for messages produced inside a local transaction. + */ +public class JmsTransactedProducerTest extends AmqpTestSupport { + + @Test(timeout = 60000) + public void testCreateTxSessionAndProducer() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + assertTrue(session.getTransacted()); + + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + assertNotNull(producer); + } + + @Test(timeout = 60000) + public void testTXProducerCommitsAreQueued() throws Exception { + final int MSG_COUNT = 10; + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Session nonTxSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = nonTxSession.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(session.createTextMessage()); + } + + Message msg = consumer.receive(2000); + assertNull(msg); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + session.commit(); + assertEquals(MSG_COUNT, proxy.getQueueSize()); + } + + @Test(timeout = 60000) + public void testTXProducerRollbacksNotQueued() throws Exception { + final int MSG_COUNT = 10; + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(session.createTextMessage()); + } + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + session.rollback(); + assertEquals(0, proxy.getQueueSize()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedSessionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedSessionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedSessionTest.java new file mode 100644 index 0000000..3dd0647 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedSessionTest.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transactions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Basic tests for Session in Transacted mode. + */ +public class JmsTransactedSessionTest extends AmqpTestSupport { + + @Test(timeout = 60000) + public void testCreateTxSession() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + assertTrue(session.getTransacted()); + + session.close(); + } + + @Test(timeout = 60000) + public void testCommitOnSessionWithNoWork() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + assertTrue(session.getTransacted()); + + session.commit(); + } + + @Test(timeout = 60000) + public void testRollbackOnSessionWithNoWork() throws Exception { + connection = createAmqpConnection(); + assertNotNull(connection); + connection.start(); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + assertNotNull(session); + assertTrue(session.getTransacted()); + + session.rollback(); + } + + @Test(timeout=60000) + public void testCloseSessionRollsBack() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + sendToAmqQueue(2); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(2, proxy.getQueueSize()); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(5000); + assertNotNull(message); + message = consumer.receive(5000); + assertNotNull(message); + + assertEquals(2, proxy.getQueueSize()); + session.close(); + assertEquals(2, proxy.getQueueSize()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java new file mode 100644 index 0000000..00a1388 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/usecases/JmsLargeMessageSendRecvTimedTest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.usecases; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Random; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsLargeMessageSendRecvTimedTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsLargeMessageSendRecvTimedTest.class); + + private final Random rand = new Random(System.currentTimeMillis()); + + private byte[] createLargePayload(int sizeInBytes) { + byte[] payload = new byte[sizeInBytes]; + for (int i = 0; i < sizeInBytes; i++) { + payload[i] = (byte) rand.nextInt(256); + } + + LOG.debug("Created buffer with size : " + sizeInBytes + " bytes"); + return payload; + } + + @Test(timeout = 2 * 60 * 1000) + public void testSendSmallerMessages() throws Exception { + for (int i = 512; i <= (16 * 1024); i += 512) { + doTestSendLargeMessage(i); + } + } + + @Test(timeout = 2 * 60 * 1000) + public void testSendFixedSizedMessages() throws Exception { + doTestSendLargeMessage(65536); + doTestSendLargeMessage(65536 * 2); + doTestSendLargeMessage(65536 * 4); + } + + @Test(timeout = 5 * 60 * 1000) + public void testSend10MBMessage() throws Exception { + doTestSendLargeMessage(1024 * 1024 * 10); + } + + @Test(timeout = 5 * 60 * 1000) + public void testSend100MBMessage() throws Exception { + doTestSendLargeMessage(1024 * 1024 * 100); + } + + public void doTestSendLargeMessage(int expectedSize) throws Exception{ + LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize); + byte[] payload = createLargePayload(expectedSize); + assertEquals(expectedSize, payload.length); + + Connection connection = createAmqpConnection(); + + long startTime = System.currentTimeMillis(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(payload); + producer.send(message); + long endTime = System.currentTimeMillis(); + + LOG.info("Returned from send after {} ms", endTime - startTime); + startTime = System.currentTimeMillis(); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + LOG.info("Calling receive"); + Message received = consumer.receive(); + assertNotNull(received); + assertTrue(received instanceof BytesMessage); + BytesMessage bytesMessage = (BytesMessage) received; + assertNotNull(bytesMessage); + endTime = System.currentTimeMillis(); + + LOG.info("Returned from receive after {} ms", endTime - startTime); + byte[] bytesReceived = new byte[expectedSize]; + assertEquals(expectedSize, bytesMessage.readBytes(bytesReceived, expectedSize)); + assertTrue(Arrays.equals(payload, bytesReceived)); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/keystore ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/keystore b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/keystore new file mode 100644 index 0000000..9ee6adf Binary files /dev/null and b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/keystore differ http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties new file mode 100644 index 0000000..6456126 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/log4j.properties @@ -0,0 +1,41 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used during tests.. +# +log4j.rootLogger=INFO, out, stdout + +log4j.logger.org.apache.qpid.jms=INFO + +# Tune the ActiveMQ and it's AMQP transport as needed for debugging. +log4j.logger.org.apache.activemq=INFO +log4j.logger.org.apache.activemq.broker=INFO +log4j.logger.org.apache.activemq.transport.amqp=INFO +log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO + +# CONSOLE appender not used by default +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n + +# File appender +log4j.appender.out=org.apache.log4j.FileAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n +log4j.appender.out.file=target/activemq-test.log +log4j.appender.out.append=true http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/provider.properties ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/provider.properties b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/provider.properties new file mode 100644 index 0000000..33d06f9 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/provider.properties @@ -0,0 +1,20 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# This config file is used by the joram jms tests. +# +jms.provider.admin.class=org.apache.qpid.jms.joram.ActiveMQAdmin \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/test.properties ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/test.properties b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/test.properties new file mode 100644 index 0000000..ac4ca8f --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/resources/test.properties @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +timeout=10000 \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
