http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java new file mode 100644 index 0000000..4b89851 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java @@ -0,0 +1,584 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; +import javax.jms.XAConnection; +import javax.jms.XAQueueConnection; +import javax.jms.XASession; +import javax.jms.XATopicConnection; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransactionBroker; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.management.JMSConnectionStatsImpl; +import org.apache.activemq.transport.failover.FailoverTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionFactoryTest.class); + long txGenerator = System.currentTimeMillis(); + private ActiveMQConnection connection; + private BrokerService broker; + + @Override + public void tearDown() throws Exception { + // Try our best to close any previously opend connection. + try { + connection.close(); + } catch (Throwable ignore) { + } + // Try our best to stop any previously started broker. + try { + broker.stop(); + } catch (Throwable ignore) { + } + } + + public void testCopy() throws URISyntaxException, JMSException { + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?"); + ActiveMQConnectionFactory copy = cf.copy(); + assertTrue("Should be an ActiveMQXAConnectionFactory", copy instanceof ActiveMQXAConnectionFactory); + } + + public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException { + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory( + "vm://localhost?jms.useAsyncSend=true"); + assertTrue(cf.isUseAsyncSend()); + // the broker url have been adjusted. + assertEquals("vm://localhost", cf.getBrokerURL()); + + cf = new ActiveMQXAConnectionFactory("vm://localhost?jms.useAsyncSend=false"); + assertFalse(cf.isUseAsyncSend()); + // the broker url have been adjusted. + assertEquals("vm://localhost", cf.getBrokerURL()); + + cf = new ActiveMQXAConnectionFactory("vm:(broker:()/localhost)?jms.useAsyncSend=true"); + assertTrue(cf.isUseAsyncSend()); + // the broker url have been adjusted. + assertEquals("vm:(broker:()/localhost)", cf.getBrokerURL()); + + cf = new ActiveMQXAConnectionFactory( + "vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=10&" + + "jms.redeliveryPolicy.initialRedeliveryDelay=10000&" + + "jms.redeliveryPolicy.redeliveryDelay=10000&" + + "jms.redeliveryPolicy.useExponentialBackOff=true&" + + "jms.redeliveryPolicy.backOffMultiplier=2"); + assertEquals(10, cf.getRedeliveryPolicy().getMaximumRedeliveries()); + assertEquals(10000, cf.getRedeliveryPolicy().getInitialRedeliveryDelay()); + assertEquals(10000, cf.getRedeliveryPolicy().getRedeliveryDelay()); + assertEquals(true, cf.getRedeliveryPolicy().isUseExponentialBackOff()); + assertEquals(2.0, cf.getRedeliveryPolicy().getBackOffMultiplier(), 0.1); + + // the broker url have been adjusted. + assertEquals("vm://localhost", cf.getBrokerURL()); + } + + public void testCreateVMConnectionWithEmbdeddBroker() throws URISyntaxException, JMSException { + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://myBroker?broker.persistent=false"); + // Make sure the broker is not created until the connection is + // instantiated. + assertNull(BrokerRegistry.getInstance().lookup("myBroker")); + connection = (ActiveMQConnection) cf.createConnection(); + // This should create the connection. + assertNotNull(connection); + // Verify the broker was created. + assertNotNull(BrokerRegistry.getInstance().lookup("myBroker")); + connection.close(); + // Verify the broker was destroyed. + assertNull(BrokerRegistry.getInstance().lookup("myBroker")); + + connection.close(); + } + + public void testGetBrokerName() throws URISyntaxException, JMSException { + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + connection = (ActiveMQConnection)cf.createConnection(); + connection.start(); + + String brokerName = connection.getBrokerName(); + LOG.info("Got broker name: " + brokerName); + + assertNotNull("No broker name available!", brokerName); + connection.close(); + } + + public void testCreateTcpConnectionUsingAllocatedPort() throws Exception { + assertCreateConnection("tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"); + } + + public void testCreateTcpConnectionUsingKnownPort() throws Exception { + assertCreateConnection("tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true"); + } + + public void testIsSameRM() throws URISyntaxException, JMSException, XAException { + + XAConnection connection1 = null; + XAConnection connection2 = null; + try { + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + connection1 = (XAConnection)cf1.createConnection(); + XASession session1 = connection1.createXASession(); + XAResource resource1 = session1.getXAResource(); + + ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + connection2 = (XAConnection)cf2.createConnection(); + XASession session2 = connection2.createXASession(); + XAResource resource2 = session2.getXAResource(); + + assertTrue(resource1.isSameRM(resource2)); + session1.close(); + session2.close(); + } finally { + if (connection1 != null) { + try { + connection1.close(); + } catch (Exception e) { + // ignore + } + } + if (connection2 != null) { + try { + connection2.close(); + } catch (Exception e) { + // ignore + } + } + } + } + + public void testIsSameRMOverride() throws URISyntaxException, JMSException, XAException { + + XAConnection connection1 = null; + XAConnection connection2 = null; + try { + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false&jms.rmIdFromConnectionId=true"); + connection1 = (XAConnection)cf1.createConnection(); + XASession session1 = connection1.createXASession(); + XAResource resource1 = session1.getXAResource(); + + ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + connection2 = (XAConnection)cf2.createConnection(); + XASession session2 = connection2.createXASession(); + XAResource resource2 = session2.getXAResource(); + + assertFalse(resource1.isSameRM(resource2)); + + // ensure identity is preserved + XASession session1a = connection1.createXASession(); + assertTrue(resource1.isSameRM(session1a.getXAResource())); + session1.close(); + session2.close(); + } finally { + if (connection1 != null) { + try { + connection1.close(); + } catch (Exception e) { + // ignore + } + } + if (connection2 != null) { + try { + connection2.close(); + } catch (Exception e) { + // ignore + } + } + } + } + + public void testVanilaTransactionalProduceReceive() throws Exception { + + XAConnection connection1 = null; + try { + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + } finally { + if (connection1 != null) { + try { + connection1.close(); + } catch (Exception e) { + // ignore + } + } + } + } + + public void testConsumerCloseTransactionalSendReceive() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + producer.close(); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + consumer.close(); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + session = connection1.createXASession(); + consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + assertNull(consumer.receive(1000)); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + } + + public void testSessionCloseTransactionalSendReceive() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + session.close(); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + session.close(); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + session = connection1.createXASession(); + consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + assertNull(consumer.receive(1000)); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + } + + + public void testReadonlyNoLeak() throws Exception { + final String brokerName = "readOnlyNoLeak"; + BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName)); + broker.setPersistent(false); + broker.start(); + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")"); + cf1.setStatsEnabled(true); + ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf1.createConnection(); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + XAResource resource = session.getXAResource(); + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + session.close(); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + assertTransactionGoneFromBroker(tid); + assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid); + assertSessionGone(xaConnection, session); + assertTransactionGoneFromFailoverState(xaConnection, tid); + + // two phase + session = xaConnection.createXASession(); + resource = session.getXAResource(); + tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + session.close(); + resource.end(tid, XAResource.TMSUCCESS); + assertEquals(XAResource.XA_RDONLY, resource.prepare(tid)); + + // no need for a commit on read only + assertTransactionGoneFromBroker(tid); + assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid); + assertSessionGone(xaConnection, session); + assertTransactionGoneFromFailoverState(xaConnection, tid); + + xaConnection.close(); + broker.stop(); + + } + + public void testCloseSendConnection() throws Exception { + String brokerName = "closeSend"; + BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName)); + broker.start(); + broker.waitUntilStarted(); + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()); + XAConnection connection = (XAConnection)cf.createConnection(); + connection.start(); + XASession session = connection.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + + connection.close(); + + assertTransactionGoneFromBroker(tid); + + broker.stop(); + } + + public void testExceptionAfterClose() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + + XASession session = connection1.createXASession(); + session.close(); + try { + session.commit(); + fail("expect exception after close"); + } catch (javax.jms.IllegalStateException expected) {} + + try { + session.rollback(); + fail("expect exception after close"); + } catch (javax.jms.IllegalStateException expected) {} + + try { + session.getTransacted(); + fail("expect exception after close"); + } catch (javax.jms.IllegalStateException expected) {} + } + + public void testRollbackXaErrorCode() throws Exception { + String brokerName = "rollbackErrorCode"; + BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName)); + broker.start(); + broker.waitUntilStarted(); + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()); + XAConnection connection = (XAConnection)cf.createConnection(); + connection.start(); + XASession session = connection.createXASession(); + XAResource resource = session.getXAResource(); + + Xid tid = createXid(); + try { + resource.rollback(tid); + fail("Expected xa exception on no tx"); + } catch (XAException expected) { + LOG.info("got expected xa", expected); + assertEquals("no tx", XAException.XAER_NOTA, expected.errorCode); + } + connection.close(); + broker.stop(); + } + + private void assertTransactionGoneFromFailoverState( + ActiveMQXAConnection connection1, Xid tid) throws Exception { + + FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class); + TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE); + assertNull("transaction should not exist in the state tracker", + transport.getStateTracker().processCommitTransactionOnePhase(info)); + } + + private void assertSessionGone(ActiveMQXAConnection connection1, + XASession session) { + JMSConnectionStatsImpl stats = (JMSConnectionStatsImpl)connection1.getStats(); + // should be no dangling sessions maintained by the transaction + assertEquals("should be no sessions", 0, stats.getSessions().length); + } + + private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception { + BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName); + CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections(); + for (TransportConnection connection: connections) { + if (connection.getConnectionId().equals(clientId)) { + try { + connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE)); + fail("did not get expected excepton on missing transaction, it must be still there in error!"); + } catch (IllegalStateException expectedOnNoTransaction) { + } + } + } + } + + private void assertTransactionGoneFromBroker(Xid tid) throws Exception { + BrokerService broker = BrokerRegistry.getInstance().lookup("localhost"); + TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class); + try { + transactionBroker.getTransaction(null, new XATransactionId(tid), false); + fail("expected exception on tx not found"); + } catch (XAException expectedOnNotFound) { + } + } + + protected void assertCreateConnection(String uri) throws Exception { + // Start up a broker with a tcp connector. + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + TransportConnector connector = broker.addConnector(uri); + broker.start(); + + URI temp = new URI(uri); + // URI connectURI = connector.getServer().getConnectURI(); + // TODO this sometimes fails when using the actual local host name + URI currentURI = new URI(connector.getPublishableConnectString()); + + // sometimes the actual host name doesn't work in this test case + // e.g. on OS X so lets use the original details but just use the actual + // port + URI connectURI = new URI(temp.getScheme(), temp.getUserInfo(), temp.getHost(), currentURI.getPort(), + temp.getPath(), temp.getQuery(), temp.getFragment()); + + LOG.info("connection URI is: " + connectURI); + + // This should create the connection. + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(connectURI); + Connection connection = cf.createConnection(); + + assertXAConnection(connection); + + assertNotNull(connection); + connection.close(); + + connection = cf.createXAConnection(); + + assertXAConnection(connection); + + assertNotNull(connection); + } + + private void assertXAConnection(Connection connection) { + assertTrue("Should be an XAConnection", connection instanceof XAConnection); + assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection); + assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection); + } + + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ClientTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ClientTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ClientTestSupport.java new file mode 100644 index 0000000..eafe359 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ClientTestSupport.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.JMSException; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.transport.TransportFactory; + +public class ClientTestSupport extends TestCase { + + protected BrokerService broker; + protected long idGenerator; + + private ActiveMQConnectionFactory connFactory; + private final String brokerURL = "vm://localhost?broker.persistent=false"; + + @Override + public void setUp() throws Exception { + final AtomicBoolean connected = new AtomicBoolean(false); + TransportConnector connector; + + // Start up a broker with a tcp connector. + try { + broker = BrokerFactory.createBroker(new URI(this.brokerURL)); + broker.getBrokerName(); + connector = new TransportConnector(TransportFactory.bind(new URI(this.brokerURL))) { + // Hook into the connector so we can assert that the server + // accepted a connection. + @Override + protected org.apache.activemq.broker.Connection createConnection(org.apache.activemq.transport.Transport transport) throws IOException { + connected.set(true); + return super.createConnection(transport); + } + }; + broker.addConnector(connector); + broker.start(); + + } catch (IOException e) { + throw new JMSException("Error creating broker " + e); + } catch (URISyntaxException e) { + throw new JMSException("Error creating broker " + e); + } + + URI connectURI; + connectURI = connector.getServer().getConnectURI(); + + // This should create the connection. + connFactory = new ActiveMQConnectionFactory(connectURI); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + public ActiveMQConnectionFactory getConnectionFactory() throws JMSException { + if (this.connFactory == null) { + throw new JMSException("ActiveMQConnectionFactory is null "); + } + return this.connFactory; + } + + // Helper Classes + protected ConnectionInfo createConnectionInfo() throws Exception { + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(new ConnectionId("connection:" + (++idGenerator))); + info.setClientId(info.getConnectionId().getValue()); + return info; + } + + protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { + SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); + return info; + } + + protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception { + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); + info.setBrowser(false); + info.setDestination(destination); + info.setPrefetchSize(1000); + info.setDispatchAsync(false); + return info; + } + + protected RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) { + return consumerInfo.createRemoveCommand(); + } + + protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) { + MessageAck ack = new MessageAck(); + ack.setAckType(ackType); + ack.setConsumerId(consumerInfo.getConsumerId()); + ack.setDestination(msg.getDestination()); + ack.setLastMessageId(msg.getMessageId()); + ack.setMessageCount(count); + return ack; + } + + protected Message receiveMessage(StubConnection connection, int maxWait) throws InterruptedException { + while (true) { + Object o = connection.getDispatchQueue().poll(maxWait, TimeUnit.MILLISECONDS); + + if (o == null) { + return null; + } + + if (o instanceof MessageDispatch) { + MessageDispatch dispatch = (MessageDispatch)o; + return dispatch.getMessage(); + } + } + } + + protected Broker getBroker() throws Exception { + return this.broker != null ? this.broker.getBroker() : null; + } + + public static void removeMessageStore() { + if (System.getProperty("activemq.store.dir") != null) { + recursiveDelete(new File(System.getProperty("activemq.store.dir"))); + } + if (System.getProperty("derby.system.home") != null) { + recursiveDelete(new File(System.getProperty("derby.system.home"))); + } + } + + public static void recursiveDelete(File f) { + if (f.isDirectory()) { + File[] files = f.listFiles(); + for (int i = 0; i < files.length; i++) { + recursiveDelete(files[i]); + } + } + f.delete(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java new file mode 100644 index 0000000..a11505c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CombinationTestSupport.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.security.ProtectionDomain; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Poor mans way of getting JUnit to run a test case through a few different + * combinations of options. Usage: If you have a test case called testFoo what + * you want to run through a few combinations, of of values for the attributes + * age and color, you would something like: <code> + * public void initCombosForTestFoo() { + * addCombinationValues( "age", new Object[]{ new Integer(21), new Integer(30) } ); + * addCombinationValues( "color", new Object[]{"blue", "green"} ); + * } + * </code> + * The testFoo test case would be run for each possible combination of age and + * color that you setup in the initCombosForTestFoo method. Before each + * combination is run, the age and color fields of the test class are set to one + * of the values defined. This is done before the normal setUp method is called. + * If you want the test combinations to show up as separate test runs in the + * JUnit reports, add a suite method to your test case similar to: <code> + * public static Test suite() { + * return suite(FooTest.class); + * } + * </code> + * + * + */ +public abstract class CombinationTestSupport extends AutoFailTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(CombinationTestSupport.class); + + private final HashMap<String, ComboOption> comboOptions = new HashMap<String, ComboOption>(); + private boolean combosEvaluated; + private Map<String, Object> options; + protected File basedir; + + static protected File basedir(Class<?> clazz) { + try { + ProtectionDomain protectionDomain = clazz.getProtectionDomain(); + return new File(new File(protectionDomain.getCodeSource().getLocation().getPath()), "../..").getCanonicalFile(); + } catch (IOException e) { + return new File("."); + } + } + + static class ComboOption { + final String attribute; + final LinkedHashSet<Object> values = new LinkedHashSet<Object>(); + + public ComboOption(String attribute, Collection<Object> options) { + this.attribute = attribute; + this.values.addAll(options); + } + } + + public CombinationTestSupport() { + basedir = basedir(getClass()); + } + public void addCombinationValues(String attribute, Object[] options) { + ComboOption co = this.comboOptions.get(attribute); + if (co == null) { + this.comboOptions.put(attribute, new ComboOption(attribute, Arrays.asList(options))); + } else { + co.values.addAll(Arrays.asList(options)); + } + } + + @Override + public void runBare() throws Throwable { + if (combosEvaluated) { + super.runBare(); + } else { + CombinationTestSupport[] combinations = getCombinations(); + for (int i = 0; i < combinations.length; i++) { + CombinationTestSupport test = combinations[i]; + if (getName() == null || getName().equals(test.getName())) { + test.runBare(); + } + } + } + } + + private void setOptions(Map<String, Object> options) throws NoSuchFieldException, IllegalAccessException { + this.options = options; + for (Iterator<String> iterator = options.keySet().iterator(); iterator.hasNext();) { + String attribute = iterator.next(); + Object value = options.get(attribute); + try { + Field field = getClass().getField(attribute); + field.set(this, value); + } catch (Throwable e) { + try { + boolean found = false; + String setterName = "set" + attribute.substring(0, 1).toUpperCase() + + attribute.substring(1); + for(Method method : getClass().getMethods()) { + if (method.getName().equals(setterName)) { + method.invoke(this, value); + found = true; + break; + } + } + + if (!found) { + throw new NoSuchMethodError("No setter found for field: " + attribute); + } + + } catch(Throwable ex) { + LOG.info("Could not set field '" + attribute + "' to value '" + value + + "', make sure the field exists and is public or has a setter."); + } + } + } + } + + private CombinationTestSupport[] getCombinations() { + try { + Method method = getClass().getMethod("initCombos", (Class[])null); + method.invoke(this, (Object[])null); + } catch (Throwable e) { + } + + String name = getName().split(" ")[0]; + String comboSetupMethodName = "initCombosFor" + Character.toUpperCase(name.charAt(0)) + name.substring(1); + try { + Method method = getClass().getMethod(comboSetupMethodName, (Class[])null); + method.invoke(this, (Object[])null); + } catch (Throwable e) { + } + + try { + ArrayList<HashMap<String, Object>> expandedOptions = new ArrayList<HashMap<String, Object>>(); + expandCombinations(new ArrayList<ComboOption>(comboOptions.values()), expandedOptions); + + if (expandedOptions.isEmpty()) { + combosEvaluated = true; + return new CombinationTestSupport[] {this}; + } else { + + ArrayList<CombinationTestSupport> result = new ArrayList<CombinationTestSupport>(); + // Run the test case for each possible combination + for (Iterator<HashMap<String, Object>> iter = expandedOptions.iterator(); iter.hasNext();) { + CombinationTestSupport combo = (CombinationTestSupport)TestSuite.createTest(getClass(), name); + combo.combosEvaluated = true; + combo.setOptions(iter.next()); + result.add(combo); + } + + CombinationTestSupport rc[] = new CombinationTestSupport[result.size()]; + result.toArray(rc); + return rc; + } + } catch (Throwable e) { + combosEvaluated = true; + return new CombinationTestSupport[] {this}; + } + + } + + private void expandCombinations(List<ComboOption> optionsLeft, List<HashMap<String, Object>> expandedCombos) { + if (!optionsLeft.isEmpty()) { + HashMap<String, Object> map; + if (comboOptions.size() == optionsLeft.size()) { + map = new HashMap<String, Object>(); + expandedCombos.add(map); + } else { + map = expandedCombos.get(expandedCombos.size() - 1); + } + + LinkedList<ComboOption> l = new LinkedList<ComboOption>(optionsLeft); + ComboOption comboOption = l.removeLast(); + int i = 0; + if (comboOption.values.isEmpty() && !l.isEmpty()) { + expandCombinations(l, expandedCombos); + } else { + for (Iterator<Object> iter = comboOption.values.iterator(); iter.hasNext();) { + Object value = iter.next(); + if (i != 0) { + map = new HashMap<String, Object>(map); + expandedCombos.add(map); + } + map.put(comboOption.attribute, value); + expandCombinations(l, expandedCombos); + i++; + } + } + } + } + + public static Test suite(Class<? extends CombinationTestSupport> clazz) { + TestSuite suite = new TestSuite(); + + ArrayList<String> names = new ArrayList<String>(); + Method[] methods = clazz.getMethods(); + for (int i = 0; i < methods.length; i++) { + String name = methods[i].getName(); + if (names.contains(name) || !isPublicTestMethod(methods[i])) { + continue; + } + names.add(name); + Test test = TestSuite.createTest(clazz, name); + if (test instanceof CombinationTestSupport) { + CombinationTestSupport[] combinations = ((CombinationTestSupport)test).getCombinations(); + for (int j = 0; j < combinations.length; j++) { + suite.addTest(combinations[j]); + } + } else { + suite.addTest(test); + } + } + return suite; + } + + private static boolean isPublicTestMethod(Method m) { + return isTestMethod(m) && Modifier.isPublic(m.getModifiers()); + } + + private static boolean isTestMethod(Method m) { + String name = m.getName(); + Class<?>[] parameters = m.getParameterTypes(); + Class<?> returnType = m.getReturnType(); + return parameters.length == 0 && name.startsWith("test") && returnType.equals(Void.TYPE); + } + + @Override + public String getName() { + return getName(false); + } + + public String getName(boolean original) { + if (options != null && !original) { + return super.getName() + " " + options; + } + return super.getName(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java new file mode 100644 index 0000000..f7a64ac --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.JMSException; +import javax.jms.Session; + +import junit.framework.TestCase; + +/** + * + */ +public class ConnectionCleanupTest extends TestCase { + + private ActiveMQConnection connection; + + protected void setUp() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + connection = (ActiveMQConnection)factory.createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + connection.close(); + } + + /** + * @throws JMSException + */ + public void testChangeClientID() throws JMSException { + + connection.setClientID("test"); + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + connection.setClientID("test"); + // fail("Should have received JMSException"); + } catch (JMSException e) { + } + + connection.cleanup(); + connection.setClientID("test"); + + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + connection.setClientID("test"); + // fail("Should have received JMSException"); + } catch (JMSException e) { + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesConcurrentTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesConcurrentTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesConcurrentTest.java new file mode 100644 index 0000000..ae15671 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesConcurrentTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.JMSException; +import javax.jms.Session; + +import junit.framework.TestCase; + +/** + * + */ +public class ConnectionCloseMultipleTimesConcurrentTest extends TestCase { + + private ActiveMQConnection connection; + private ExecutorService executor; + private int size = 200; + + protected void setUp() throws Exception { + executor = Executors.newFixedThreadPool(20); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + connection = (ActiveMQConnection)factory.createConnection(); + connection.start(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection.isStarted()) { + connection.stop(); + } + if (executor != null) { + executor.shutdownNow(); + } + } + + /** + * @throws javax.jms.JMSException + */ + public void testCloseMultipleTimes() throws Exception { + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + assertTrue(connection.isStarted()); + assertFalse(connection.isClosed()); + + final CountDownLatch latch = new CountDownLatch(size); + + for (int i = 0; i < size; i++) { + executor.submit(new Runnable() { + @Override + public void run() { + try { + connection.close(); + + assertFalse(connection.isStarted()); + assertTrue(connection.isClosed()); + + latch.countDown(); + } catch (JMSException e) { + // ignore + } + } + }); + } + + boolean zero = latch.await(20, TimeUnit.SECONDS); + assertTrue("Should complete all", zero); + + // should not fail calling again + connection.close(); + + assertFalse(connection.isStarted()); + assertTrue(connection.isClosed()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesTest.java new file mode 100644 index 0000000..3e78d73 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCloseMultipleTimesTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.JMSException; +import javax.jms.Session; + +import junit.framework.TestCase; + +/** + * + */ +public class ConnectionCloseMultipleTimesTest extends TestCase { + + private ActiveMQConnection connection; + + protected void setUp() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + connection = (ActiveMQConnection)factory.createConnection(); + connection.start(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection.isStarted()) { + connection.stop(); + } + } + + /** + * @throws javax.jms.JMSException + */ + public void testCloseMultipleTimes() throws JMSException { + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + assertTrue(connection.isStarted()); + assertFalse(connection.isClosed()); + + connection.close(); + + assertFalse(connection.isStarted()); + assertTrue(connection.isClosed()); + + // should not fail calling again + connection.close(); + + assertFalse(connection.isStarted()); + assertTrue(connection.isClosed()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java new file mode 100644 index 0000000..b34fe7f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConsumerReceiveWithTimeoutTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * + */ +public class ConsumerReceiveWithTimeoutTest extends TestSupport { + + private Connection connection; + + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + /** + * Test to check if consumer thread wakes up inside a receive(timeout) after + * a message is dispatched to the consumer + * + * @throws javax.jms.JMSException + */ + public void testConsumerReceiveBeforeMessageDispatched() throws JMSException { + + connection.start(); + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue("test"); + + Thread t = new Thread() { + public void run() { + try { + // wait for 10 seconds to allow consumer.receive to be run + // first + Thread.sleep(10000); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + t.start(); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(60000); + assertNotNull(msg); + session.close(); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java new file mode 100644 index 0000000..773e871 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/CreateConsumerButDontStartConnectionWarningTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.JMSException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class CreateConsumerButDontStartConnectionWarningTest extends JmsQueueSendReceiveTest { + private static final transient Logger LOG = LoggerFactory.getLogger(CreateConsumerButDontStartConnectionWarningTest.class); + + @Override + protected void startConnection() throws JMSException { + // don't start the connection + } + + @Override + protected void assertMessagesAreReceived() throws JMSException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Caught: " + e, e); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerAndConnectionTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerAndConnectionTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerAndConnectionTestSupport.java new file mode 100644 index 0000000..aa39cc1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerAndConnectionTestSupport.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; + +/** + * A base class for a test case which creates an embedded broker and uses a connection and session + * + * + */ +public abstract class EmbeddedBrokerAndConnectionTestSupport extends EmbeddedBrokerTestSupport { + protected Connection connection; + + @Override + protected void setUp() throws Exception { + super.setUp(); + + connection = createConnection(); + connection.start(); + } + + @Override + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java new file mode 100644 index 0000000..b049e96 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.springframework.jms.core.JmsTemplate; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; + +/** + * A useful base class which creates and closes an embedded broker + * + * + */ +public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport { + + protected BrokerService broker; + // protected String bindAddress = "tcp://localhost:61616"; + protected String bindAddress = "vm://localhost"; + protected ConnectionFactory connectionFactory; + protected boolean useTopic; + protected ActiveMQDestination destination; + protected JmsTemplate template; + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + startBroker(); + + connectionFactory = createConnectionFactory(); + + destination = createDestination(); + + template = createJmsTemplate(); + template.setDefaultDestination(destination); + template.setPubSubDomain(useTopic); + template.afterPropertiesSet(); + } + + protected void tearDown() throws Exception { + if (broker != null) { + try { + broker.stop(); + } catch (Exception e) { + } + } + } + + /** + * Factory method to create a new {@link JmsTemplate} + * + * @return a newly created JmsTemplate + */ + protected JmsTemplate createJmsTemplate() { + return new JmsTemplate(connectionFactory); + } + + /** + * Factory method to create a new {@link Destination} + * + * @return newly created Destinaiton + */ + protected ActiveMQDestination createDestination() { + return createDestination(getDestinationString()); + } + + /** + * Factory method to create the destination in either the queue or topic + * space based on the value of the {@link #useTopic} field + */ + protected ActiveMQDestination createDestination(String subject) { + if (useTopic) { + return new ActiveMQTopic(subject); + } else { + return new ActiveMQQueue(subject); + } + } + + /** + * Returns the name of the destination used in this test case + */ + protected String getDestinationString() { + return getClass().getName() + "." + getName(); + } + + /** + * Factory method to create a new {@link ConnectionFactory} instance + * + * @return a newly created connection factory + */ + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(bindAddress); + } + + /** + * Factory method to create a new broker + * + * @throws Exception + */ + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(isPersistent()); + answer.addConnector(bindAddress); + return answer; + } + + protected void startBroker() throws Exception { + broker.start(); + } + + /** + * @return whether or not persistence should be used + */ + protected boolean isPersistent() { + return false; + } + + /** + * Factory method to create a new connection + */ + protected Connection createConnection() throws Exception { + return connectionFactory.createConnection(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java new file mode 100644 index 0000000..7aa22e3 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerStartupDestinationTest.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; + +public class ExclusiveConsumerStartupDestinationTest extends EmbeddedBrokerTestSupport{ + + private static final String VM_BROKER_URL = "vm://localhost"; + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + PolicyMap map = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setAllConsumersExclusiveByDefault(true); + map.setDefaultEntry(entry); + answer.setDestinationPolicy(map); + return answer; + } + + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/exclusive-consumer-startup-destination.xml"; + } + + private Connection createConnection(final boolean start) throws JMSException { + ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL); + Connection conn = cf.createConnection(); + if (start) { + conn.start(); + } + return conn; + } + + public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + // TODO need two send a 2nd message - bug AMQ-1024 + // producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer.receive(100)); + assertNull(fallbackConsumer.receive(100)); + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + } + + public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, + InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession1 = null; + Session exclusiveSession2 = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 + // bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2"); + MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue); + MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer1.receive(100)); + assertNull(exclusiveConsumer2.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer1.close(); + + producer.send(msg); + producer.send(msg); + + assertNotNull("Should have received a message", exclusiveConsumer2.receive(100)); + assertNull("Should not have received a message", fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 + // bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer.close(); + + producer.send(msg); + + assertNotNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + } +}
