http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java new file mode 100644 index 0000000..5fa0620 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.MessageNotWriteableException; + +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; +import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.usage.SystemUsage; + +public class BrokerTestSupport extends CombinationTestSupport { + + /** + * Setting this to false makes the test run faster but they may be less + * accurate. + */ + public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true"); + + protected RegionBroker regionBroker; + public BrokerService broker; + protected long idGenerator; + protected int msgIdGenerator; + protected int txGenerator; + protected int tempDestGenerator; + public PersistenceAdapter persistenceAdapter; + + protected String queueName = "TEST"; + + protected int maxWait = 10000; + + protected SystemUsage memoryManager; + protected PolicyMap policyMap = new PolicyMap(); + + @Override + protected void setUp() throws Exception { + super.setUp(); + broker = createBroker(); + policyMap.setDefaultEntry(getDefaultPolicy()); + broker.setDestinationPolicy(policyMap); + broker.start(); + } + + protected PolicyEntry getDefaultPolicy() { + PolicyEntry policy = new PolicyEntry(); + policy.setDispatchPolicy(new RoundRobinDispatchPolicy()); + policy.setSubscriptionRecoveryPolicy(new FixedCountSubscriptionRecoveryPolicy()); + return policy; + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false")); + return broker; + } + + @Override + protected void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + regionBroker = null; + persistenceAdapter = null; + memoryManager = null; + super.tearDown(); + } + + protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, ActiveMQDestination destination) throws Exception { + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); + info.setBrowser(false); + info.setDestination(destination); + info.setPrefetchSize(1000); + info.setDispatchAsync(false); + return info; + } + + protected RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) { + return consumerInfo.createRemoveCommand(); + } + + protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception { + ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); + return info; + } + + protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { + SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); + return info; + } + + protected ConnectionInfo createConnectionInfo() throws Exception { + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(new ConnectionId("connection:" + (++idGenerator))); + info.setClientId(info.getConnectionId().getValue()); + return info; + } + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator)); + message.setDestination(destination); + message.setPersistent(false); + try { + message.setText("Test Message Payload."); + } catch (MessageNotWriteableException e) { + } + return message; + } + + protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int count, byte ackType) { + MessageAck ack = new MessageAck(); + ack.setAckType(ackType); + ack.setConsumerId(consumerInfo.getConsumerId()); + ack.setDestination(msg.getDestination()); + ack.setLastMessageId(msg.getMessageId()); + ack.setMessageCount(count); + return ack; + } + + protected void gc() { + regionBroker.gc(); + } + + protected void profilerPause(String prompt) throws IOException { + if (System.getProperty("profiler") != null) { + System.out.println(); + System.out.println(prompt + "> Press enter to continue: "); + while (System.in.read() != '\n') { + } + System.out.println(prompt + "> Done."); + } + } + + protected RemoveInfo closeConnectionInfo(ConnectionInfo info) { + return info.createRemoveCommand(); + } + + protected RemoveInfo closeSessionInfo(SessionInfo info) { + return info.createRemoveCommand(); + } + + protected RemoveInfo closeProducerInfo(ProducerInfo info) { + return info.createRemoveCommand(); + } + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT); + return message; + } + + protected LocalTransactionId createLocalTransaction(SessionInfo info) { + LocalTransactionId id = new LocalTransactionId(info.getSessionId().getParentId(), ++txGenerator); + return id; + } + + protected XATransactionId createXATransaction(SessionInfo info) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + byte[] bs = baos.toByteArray(); + + XATransactionId xid = new XATransactionId(); + xid.setBranchQualifier(bs); + xid.setGlobalTransactionId(bs); + xid.setFormatId(55); + return xid; + } + + protected TransactionInfo createBeginTransaction(ConnectionInfo connectionInfo, TransactionId txid) { + TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.BEGIN); + return info; + } + + protected TransactionInfo createPrepareTransaction(ConnectionInfo connectionInfo, TransactionId txid) { + TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.PREPARE); + return info; + } + + protected TransactionInfo createCommitTransaction1Phase(ConnectionInfo connectionInfo, TransactionId txid) { + TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.COMMIT_ONE_PHASE); + return info; + } + + protected TransactionInfo createCommitTransaction2Phase(ConnectionInfo connectionInfo, TransactionId txid) { + TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.COMMIT_TWO_PHASE); + return info; + } + + protected TransactionInfo createRollbackTransaction(ConnectionInfo connectionInfo, TransactionId txid) { + TransactionInfo info = new TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.ROLLBACK); + return info; + } + + protected int countMessagesInQueue(StubConnection connection, ConnectionInfo connectionInfo, ActiveMQDestination destination) throws Exception { + + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setPrefetchSize(1); + consumerInfo.setBrowser(true); + connection.send(consumerInfo); + + ArrayList<Object> skipped = new ArrayList<Object>(); + + // Now get the messages. + Object m = connection.getDispatchQueue().poll(maxWait, TimeUnit.MILLISECONDS); + int i = 0; + while (m != null) { + if (m instanceof MessageDispatch && ((MessageDispatch)m).getConsumerId().equals(consumerInfo.getConsumerId())) { + MessageDispatch md = (MessageDispatch)m; + if (md.getMessage() != null) { + i++; + connection.send(createAck(consumerInfo, md.getMessage(), 1, MessageAck.STANDARD_ACK_TYPE)); + } else { + break; + } + } else { + skipped.add(m); + } + m = connection.getDispatchQueue().poll(maxWait, TimeUnit.MILLISECONDS); + } + + for (Iterator<Object> iter = skipped.iterator(); iter.hasNext();) { + connection.getDispatchQueue().put(iter.next()); + } + + connection.send(closeSessionInfo(sessionInfo)); + return i; + + } + + protected DestinationInfo createTempDestinationInfo(ConnectionInfo connectionInfo, byte destinationType) { + DestinationInfo info = new DestinationInfo(); + info.setConnectionId(connectionInfo.getConnectionId()); + info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); + info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId() + ":" + (++tempDestGenerator), destinationType)); + return info; + } + + protected ActiveMQDestination createDestinationInfo(StubConnection connection, ConnectionInfo connectionInfo1, byte destinationType) throws Exception { + if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) { + DestinationInfo info = createTempDestinationInfo(connectionInfo1, destinationType); + connection.send(info); + return info.getDestination(); + } else { + return ActiveMQDestination.createDestination(queueName, destinationType); + } + } + + protected DestinationInfo closeDestinationInfo(DestinationInfo info) { + info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); + info.setTimeout(0); + return info; + } + + public static void recursiveDelete(File f) { + if (f.isDirectory()) { + File[] files = f.listFiles(); + for (int i = 0; i < files.length; i++) { + recursiveDelete(files[i]); + } + } + f.delete(); + } + + protected StubConnection createConnection() throws Exception { + return new StubConnection(broker); + } + + /** + * @param connection + * @return + * @throws InterruptedException + */ + public Message receiveMessage(StubConnection connection) throws InterruptedException { + return receiveMessage(connection, maxWait); + } + + public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException { + while (true) { + Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS); + + if (o == null) { + return null; + } + if (o instanceof MessageDispatch) { + + MessageDispatch dispatch = (MessageDispatch)o; + if (dispatch.getMessage() == null) { + return null; + } + dispatch.setMessage(dispatch.getMessage().copy()); + dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter()); + return dispatch.getMessage(); + } + } + }; + + protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException { + long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait; + while (true) { + Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS); + if (o == null) { + return; + } + if (o instanceof MessageDispatch && ((MessageDispatch)o).getMessage() != null) { + fail("Received a message: "+((MessageDispatch)o).getMessage().getMessageId()); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java new file mode 100644 index 0000000..0c791fd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.SessionId; + +public class ConcurrentConnectSimulationTest extends BrokerTestSupport { + + /* + * simulate failover and retry of connection before broker has killed connection + * which appears as a concurrent connect request to the broker + * see: https://issues.apache.org/activemq/browse/AMQ-2241 + */ + public void testConcurrentConnection() throws Exception { + + StubConnection connection1 = createConnection(); + StubConnection connection2 = createConnection(); + + // reuse same connection info + ConnectionInfo connectionInfo = createConnectionInfo(); + connection1.request(connectionInfo); + connection2.request(connectionInfo); + + // second one should win out, verify using consumer on default session (watchAdvisories) + ConsumerId consumerId = new ConsumerId(new SessionId(connectionInfo.getConnectionId(), -1), 1); + ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); + consumerInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC); + + connection2.request(consumerInfo); + } + + public static Test suite() { + return suite(ConcurrentConnectSimulationTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/CreateDestinationsOnStartupViaXBeanTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/CreateDestinationsOnStartupViaXBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/CreateDestinationsOnStartupViaXBeanTest.java new file mode 100644 index 0000000..70fda7c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/CreateDestinationsOnStartupViaXBeanTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.net.URI; +import java.util.Set; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.xbean.XBeanBrokerFactory; + +/** + * + * + */ +public class CreateDestinationsOnStartupViaXBeanTest extends EmbeddedBrokerTestSupport { + + public void testNewDestinationsAreCreatedOnStartup() throws Exception { + assertQueueCreated("FOO.BAR", true); + assertQueueCreated("FOO.DoesNotExist", false); + + assertTopicCreated("SOME.TOPIC", true); + assertTopicCreated("FOO.DoesNotExist", false); + } + + protected void assertQueueCreated(String name, boolean expected) throws Exception { + assertDestinationCreated(new ActiveMQQueue(name), expected); + } + + protected void assertTopicCreated(String name, boolean expected) throws Exception { + assertDestinationCreated(new ActiveMQTopic(name), expected); + } + + protected void assertDestinationCreated(ActiveMQDestination destination, boolean expected) throws Exception { + Set answer = broker.getBroker().getDestinations(destination); + int size = expected ? 1 : 0; + assertEquals("Could not find destination: " + destination + ". Size of found destinations: " + answer, size, answer.size()); + } + + protected BrokerService createBroker() throws Exception { + XBeanBrokerFactory factory = new XBeanBrokerFactory(); + BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); + + // lets disable persistence as we are a test + answer.setPersistent(false); + + return answer; + } + + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/destinations-on-start.xml"; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java new file mode 100644 index 0000000..c186420 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; + +public class DedicatedTaskRunnerBrokerTest extends BrokerTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setDedicatedTaskRunner(true); + return broker; + } + + public static Test suite() { + return suite(DedicatedTaskRunnerBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java new file mode 100644 index 0000000..8fd1292 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.DeliveryMode; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.network.NetworkTestSupport; + +/** + * Pretend to be an abusive client that sends multiple identical ConsumerInfo + * commands and make sure the broker doesn't stall because of it. + */ + +public class DoubleSubscriptionTest extends NetworkTestSupport { + + public ActiveMQDestination destination; + public int deliveryMode; + + private String remoteURI = "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; + + public static Test suite() { + return suite(DoubleSubscriptionTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public void initCombosForTestDoubleSubscription() { + addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST"), new ActiveMQQueue("TEST")}); + } + + public void testDoubleSubscription() throws Exception { + + // Start a normal consumer on the remote broker + StubConnection connection1 = createRemoteConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.request(consumerInfo1); + + // Start a normal producer on a remote broker + StubConnection connection2 = createRemoteConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.request(producerInfo2); + + // Send a message to make sure the basics are working + connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); + + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNoMessagesLeft(connection1); + + connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE)); + + // Send a message to sit on the broker while we mess with it + connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); + + // Now we're going to resend the same consumer commands again and see if + // the broker + // can handle it. + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.request(consumerInfo1); + + // After this there should be 2 messages on the broker... + connection2.request(createMessage(producerInfo2, destination, DeliveryMode.PERSISTENT)); + + // ... let's start a fresh consumer... + connection1.stop(); + StubConnection connection3 = createRemoteConnection(); + ConnectionInfo connectionInfo3 = createConnectionInfo(); + SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3); + ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo3, destination); + connection3.send(connectionInfo3); + connection3.send(sessionInfo3); + connection3.request(consumerInfo3); + + // ... and then grab the 2 that should be there. + assertNotNull(receiveMessage(connection3)); + assertNotNull(receiveMessage(connection3)); + assertNoMessagesLeft(connection3); + } + + protected String getRemoteURI() { + return remoteURI; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DurablePersistentFalseRestartTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DurablePersistentFalseRestartTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DurablePersistentFalseRestartTest.java new file mode 100644 index 0000000..b972498 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DurablePersistentFalseRestartTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.transport.failover.FailoverTransport; + +public class DurablePersistentFalseRestartTest extends BrokerRestartTestSupport { + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + broker.setPersistent(false); + broker.setPersistenceAdapter(new KahaDBPersistenceAdapter()); + broker.addConnector("tcp://0.0.0.0:0"); + } + + public void testValidateNoPersistenceForDurableAfterRestart() throws Exception { + + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString() + ")"); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setClientID("clientId"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic destination = session.createTopic(queueName); + MessageConsumer consumer = session.createDurableSubscriber(destination, "subscriberName"); + + populateDestination(10, destination, connection); + + restartBroker(); + + // make failover aware of the restarted auto assigned port + ((FailoverTransport) connection.getTransport().narrow(FailoverTransport.class)).add(true, broker.getTransportConnectors().get(0).getPublishableConnectString()); + + TextMessage msg = (TextMessage) consumer.receive(4000); + assertNull("did not get a message when persistent=false, message: " + msg, msg); + + connection.close(); + } + + private void populateDestination(final int nbMessages, + final Destination destination, javax.jms.Connection connection) + throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("<hello id='" + i + "'/>")); + } + producer.close(); + session.close(); + } + + + public static Test suite() { + return suite(DurablePersistentFalseRestartTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java new file mode 100644 index 0000000..5788dad --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.apache.derby.jdbc.EmbeddedXADataSource; + +public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest { + + EmbeddedXADataSource dataSource; + + @Override + protected void setUp() throws Exception { + dataSource = new EmbeddedXADataSource(); + dataSource.setDatabaseName("derbyDb"); + dataSource.setCreateDatabase("create"); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + stopDerby(); + } + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + super.configureBroker(broker); + + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + jdbc.setDataSource(dataSource); + broker.setPersistenceAdapter(jdbc); + } + + @Override + protected void restartBroker() throws Exception { + broker.stop(); + stopDerby(); + dataSource = new EmbeddedXADataSource(); + dataSource.setDatabaseName("derbyDb"); + dataSource.setCreateDatabase("create"); + + broker = createRestartedBroker(); + broker.start(); + } + + private void stopDerby() { + LOG.info("STOPPING DB!@!!!!"); + final EmbeddedDataSource ds = dataSource; + try { + ds.setShutdownDatabase("shutdown"); + ds.getConnection(); + } catch (Exception ignored) { + } + + } + + public static Test suite() { + return suite(JdbcXARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + @Override + protected ActiveMQDestination createDestination() { + return new ActiveMQQueue("test,special"); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/Main.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/Main.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/Main.java new file mode 100644 index 0000000..9e1fa5e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/Main.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.demo.DefaultQueueSender; + +/** + * A helper class which can be handy for running a broker in your IDE from the + * activemq-core module. + * + * + */ +public final class Main { + protected static boolean createConsumers; + + private Main() { + } + + /** + * @param args + */ + public static void main(String[] args) { + try { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + + // String brokerDir = "xbean:...; + // System.setProperty("activemq.base", brokerDir); + // BrokerService broker = BrokerFactory.createBroker(new URI(brokerDir + "/activemq.xml")); + + // for running on Java 5 without mx4j + ManagementContext managementContext = broker.getManagementContext(); + managementContext.setFindTigerMbeanServer(true); + managementContext.setUseMBeanServer(true); + managementContext.setCreateConnector(false); + + broker.setUseJmx(true); + // broker.setPlugins(new BrokerPlugin[] { new + // ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() }); + broker.addConnector("tcp://localhost:61616"); + broker.addConnector("stomp://localhost:61613"); + broker.start(); + + // lets publish some messages so that there is some stuff to browse + DefaultQueueSender.main(new String[] {"Prices.Equity.IBM"}); + DefaultQueueSender.main(new String[] {"Prices.Equity.MSFT"}); + + // lets create a dummy couple of consumers + if (createConsumers) { + Connection connection = new ActiveMQConnectionFactory().createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(new ActiveMQQueue("Orders.IBM")); + session.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 100"); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session2.createConsumer(new ActiveMQQueue("Orders.MSFT"), "price > 200"); + } else { + // Lets wait for the broker + broker.waitUntilStopped(); + } + } catch (Exception e) { + System.out.println("Failed: " + e); + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java new file mode 100644 index 0000000..3175156 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.IOException; + +import junit.framework.Test; + +import org.apache.activemq.command.Command; +import org.apache.activemq.command.Response; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.wireformat.WireFormat; + +/** + * Runs against the broker but marshals all request and response commands. + * + * + */ +public class MarshallingBrokerTest extends BrokerTest { + + public WireFormat wireFormat = new OpenWireFormat(); + + public void initCombos() { + + OpenWireFormat wf1 = new OpenWireFormat(); + wf1.setCacheEnabled(false); + OpenWireFormat wf2 = new OpenWireFormat(); + wf2.setCacheEnabled(true); + + addCombinationValues("wireFormat", new Object[] {wf1, wf2, }); + } + + protected StubConnection createConnection() throws Exception { + return new StubConnection(broker) { + public Response request(Command command) throws Exception { + Response r = super.request((Command)wireFormat.unmarshal(wireFormat.marshal(command))); + if (r != null) { + r = (Response)wireFormat.unmarshal(wireFormat.marshal(r)); + } + return r; + } + + public void send(Command command) throws Exception { + super.send((Command)wireFormat.unmarshal(wireFormat.marshal(command))); + } + + protected void dispatch(Command command) throws InterruptedException, IOException { + super.dispatch((Command)wireFormat.unmarshal(wireFormat.marshal(command))); + }; + }; + } + + public static Test suite() { + return suite(MarshallingBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java new file mode 100644 index 0000000..5c7f29d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.DeliveryMode; + +import junit.framework.Test; + +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.LocalTransactionId; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; + +public class MessageExpirationTest extends BrokerTestSupport { + + public ActiveMQDestination destination; + public int deliveryMode = DeliveryMode.NON_PERSISTENT; + public int prefetch; + public byte destinationType = ActiveMQDestination.QUEUE_TYPE; + public boolean durableConsumer; + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode, int timeToLive) { + Message message = createMessage(producerInfo, destination, deliveryMode); + long now = System.currentTimeMillis(); + message.setTimestamp(now); + message.setExpiration(now + timeToLive); + return message; + } + + public void initCombosForTestMessagesWaitingForUssageDecreaseExpire() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + return broker; + } + + protected PolicyEntry getDefaultPolicy() { + PolicyEntry policy = super.getDefaultPolicy(); + // disable spooling + policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); + // have aggressive expiry period to ensure no deadlock or clash + policy.setExpireMessagesPeriod(100); + + return policy; + } + + public void testMessagesWaitingForUsageDecreaseExpire() throws Exception { + + // Start a producer + final StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + final ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // Start a consumer.. + final StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + + destination = createDestinationInfo(connection2, connectionInfo2, destinationType); + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); + consumerInfo2.setPrefetchSize(1); + connection2.request(consumerInfo2); + + // Reduce the limit so that only 1 message can flow through the broker + // at a time. + broker.getSystemUsage().getMemoryUsage().setLimit(1); + + final Message m1 = createMessage(producerInfo, destination, deliveryMode); + final Message m2 = createMessage(producerInfo, destination, deliveryMode, 1000); + final Message m3 = createMessage(producerInfo, destination, deliveryMode); + final Message m4 = createMessage(producerInfo, destination, deliveryMode, 1000); + + // Produce in an async thread since the producer will be getting blocked + // by the usage manager.. + new Thread() { + public void run() { + // m1 and m3 should not expire.. but the others should. + try { + connection.send(m1); + connection.send(m2); + connection.send(m3); + connection.send(m4); + } catch (Exception e) { + e.printStackTrace(); + } + } + }.start(); + + // Make sure only 1 message was delivered due to prefetch == 1 + Message m = receiveMessage(connection2); + assertNotNull(m); + assertEquals(m1.getMessageId(), m.getMessageId()); + assertNoMessagesLeft(connection); + + // Sleep before we ack so that the messages expire on the usage manager + Thread.sleep(1500); + connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // 2nd message received should be m3.. it should have expired 2nd + // message sent. + m = receiveMessage(connection2); + assertNotNull(m); + assertEquals(m3.getMessageId(), m.getMessageId()); + + // Sleep before we ack so that the messages expire on the usage manager + Thread.sleep(1500); + connection2.send(createAck(consumerInfo2, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // And there should be no messages left now.. + assertNoMessagesLeft(connection2); + + connection.send(closeConnectionInfo(connectionInfo)); + connection.send(closeConnectionInfo(connectionInfo2)); + } + + public void initCombosForTestMessagesInLongTransactionExpire() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.PERSISTENT), Integer.valueOf(DeliveryMode.NON_PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testMessagesInLongTransactionExpire() throws Exception { + + // Start a producer and consumer + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + destination = createDestinationInfo(connection, connectionInfo, destinationType); + + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setPrefetchSize(1000); + connection.send(consumerInfo); + + // Start the tx.. + LocalTransactionId txid = createLocalTransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + // m1 and m3 should not expire.. but the others should. + Message m1 = createMessage(producerInfo, destination, deliveryMode); + m1.setTransactionId(txid); + connection.send(m1); + Message m = createMessage(producerInfo, destination, deliveryMode, 1000); + m.setTransactionId(txid); + connection.send(m); + Message m3 = createMessage(producerInfo, destination, deliveryMode); + m3.setTransactionId(txid); + connection.send(m3); + m = createMessage(producerInfo, destination, deliveryMode, 1000); + m.setTransactionId(txid); + connection.send(m); + + // Sleep before we commit so that the messages expire on the commit + // list.. + Thread.sleep(1500); + connection.send(createCommitTransaction1Phase(connectionInfo, txid)); + + m = receiveMessage(connection); + assertNotNull(m); + assertEquals(m1.getMessageId(), m.getMessageId()); + connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // 2nd message received should be m3.. it should have expired 2nd + // message sent. + m = receiveMessage(connection); + assertNotNull(m); + assertEquals(m3.getMessageId(), m.getMessageId()); + connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // And there should be no messages left now.. + assertNoMessagesLeft(connection); + + connection.send(closeConnectionInfo(connectionInfo)); + } + + public void initCombosForTestMessagesInSubscriptionPendingListExpire() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testMessagesInSubscriptionPendingListExpire() throws Exception { + + // Start a producer and consumer + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + destination = createDestinationInfo(connection, connectionInfo, destinationType); + + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setPrefetchSize(1); + connection.send(consumerInfo); + + // m1 and m3 should not expire.. but the others should. + Message m1 = createMessage(producerInfo, destination, deliveryMode); + connection.send(m1); + connection.send(createMessage(producerInfo, destination, deliveryMode, 1000)); + Message m3 = createMessage(producerInfo, destination, deliveryMode); + connection.send(m3); + connection.send(createMessage(producerInfo, destination, deliveryMode, 1000)); + + // Make sure only 1 message was delivered due to prefetch == 1 + Message m = receiveMessage(connection); + assertNotNull(m); + assertEquals(m1.getMessageId(), m.getMessageId()); + assertNoMessagesLeft(connection); + + // Sleep before we ack so that the messages expire on the pending list.. + Thread.sleep(1500); + connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // 2nd message received should be m3.. it should have expired 2nd + // message sent. + m = receiveMessage(connection); + assertNotNull(m); + assertEquals(m3.getMessageId(), m.getMessageId()); + connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); + + // And there should be no messages left now.. + assertNoMessagesLeft(connection); + + connection.send(closeConnectionInfo(connectionInfo)); + } + + public static Test suite() { + return suite(MessageExpirationTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java new file mode 100644 index 0000000..898256c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +@RunWith(BlockJUnit4ClassRunner.class) +public class NioQueueSubscriptionTest extends QueueSubscriptionTest { + + protected static final Logger LOG = LoggerFactory.getLogger(NioQueueSubscriptionTest.class); + + private final Map<Thread, Throwable> exceptions = Collections.synchronizedMap(new HashMap<Thread, Throwable>()); + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("tcp://localhost:62621?trace=false"); + } + + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = BrokerFactory.createBroker(new URI( + "broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0")); + answer.getManagementContext().setCreateConnector(false); + answer.setUseJmx(false); + answer.setDeleteAllMessagesOnStartup(true); + final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>(); + final PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setOptimizedDispatch(true); + policyEntries.add(entry); + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + answer.setDestinationPolicy(policyMap); + return answer; + } + + + @Ignore("See AMQ-4286") + @Test(timeout = 60 * 1000) + public void testLotsOfConcurrentConnections() throws Exception { + ExecutorService executor = Executors.newCachedThreadPool(); + final ConnectionFactory factory = createConnectionFactory(); + int connectionCount = 400; + final AtomicInteger threadId = new AtomicInteger(0); + for (int i = 0; i < connectionCount; i++) { + executor.execute(new Runnable() { + @Override + public void run() { + final int innerId = threadId.incrementAndGet(); + try { + ExceptionListener listener = new NioQueueSubscriptionTestListener(innerId, exceptions, LOG); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + connection.setExceptionListener(listener); + connection.start(); + assertNotNull(connection.getBrokerName()); + connections.add(connection); + } catch (Exception e) { + LOG.error(">>>> Exception in run() on thread " + innerId, e); + exceptions.put(Thread.currentThread(), e); + } + } + }); + } + + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + if (!exceptions.isEmpty()) { + LOG.error(">>>> " + exceptions.size() + " exceptions like", exceptions.values().iterator().next()); + fail("unexpected exceptions in worker threads: " + exceptions.values().iterator().next()); + } + LOG.info("created " + connectionCount + " connections"); + } +} + +class NioQueueSubscriptionTestListener implements ExceptionListener { + private int id = 0; + protected Logger LOG; + private final Map<Thread, Throwable> exceptions; + + public NioQueueSubscriptionTestListener(int id, Map<Thread, Throwable> exceptions, Logger log) { + this.id = id; + this.exceptions = exceptions; + this.LOG = log; + } + + @Override + public void onException(JMSException exception) { + LOG.error(">>>> Exception in onException() on thread " + id, exception); + exceptions.put(Thread.currentThread(), exception); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/OutOfOrderXMLTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/OutOfOrderXMLTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/OutOfOrderXMLTest.java new file mode 100644 index 0000000..11fbb56 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/OutOfOrderXMLTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.net.URI; +import org.apache.activemq.xbean.XBeanBrokerFactory; +import org.junit.Test; + +// https://issues.apache.org/activemq/browse/AMQ-2939 +public class OutOfOrderXMLTest { + + @Test + public void verifyBrokerCreationWhenXmlOutOfOrderValidationFalse() throws Exception { + BrokerService answer = + BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/broker/out-of-order-broker-elements.xml?validate=false")); + answer.stop(); + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ProgressPrinter.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ProgressPrinter.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ProgressPrinter.java new file mode 100644 index 0000000..dcf4b6e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ProgressPrinter.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker; + +public class ProgressPrinter { + + private final long total; + private final long interval; + private long percentDone; + private long counter; + + public ProgressPrinter(long total, long interval) { + this.total = total; + this.interval = interval; + } + + public synchronized void increment() { + update(++counter); + } + + public synchronized void update(long current) { + long at = 100 * current / total; + if ((percentDone / interval) != (at / interval)) { + percentDone = at; + System.out.println("Completed: " + percentDone + "%"); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java new file mode 100644 index 0000000..c004fef --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; + +import org.apache.activemq.TestSupport; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.JMXSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(value = Parameterized.class) +public class QueueMbeanRestartTest extends TestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(QueueMbeanRestartTest.class); + + BrokerService broker; + + private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice; + + @Parameterized.Parameters + public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() { + TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB}; + TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB}; + TestSupport.PersistenceAdapterChoice[] jdbc = {TestSupport.PersistenceAdapterChoice.JDBC}; + List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>(); + choices.add(kahaDb); + choices.add(levelDb); + choices.add(jdbc); + + return choices; + } + + public QueueMbeanRestartTest(TestSupport.PersistenceAdapterChoice choice) { + this.persistenceAdapterChoice = choice; + } + + @Before + public void setUp() throws Exception { + topic = false; + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + broker.stop(); + } + + @Test(timeout = 60000) + public void testMBeanPresenceOnRestart() throws Exception { + createBroker(true); + + sendMessages(); + verifyPresenceOfQueueMbean(); + LOG.info("restart...."); + + restartBroker(); + verifyPresenceOfQueueMbean(); + } + + private void restartBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + Thread.sleep(5 * 1000); + createBroker(false); + broker.waitUntilStarted(); + } + + private void verifyPresenceOfQueueMbean() throws Exception { + for (ObjectName name : broker.getManagementContext().queryNames(null, null)) { + LOG.info("candidate :" + name); + String type = name.getKeyProperty("destinationType"); + if (type != null && type.equals("Queue")) { + assertEquals( + JMXSupport.encodeObjectNamePart(((ActiveMQQueue) createDestination()).getPhysicalName()), + name.getKeyProperty("destinationName")); + LOG.info("found mbbean " + name); + return; + } + } + fail("expected to find matching queue mbean for: " + createDestination()); + } + + private void sendMessages() throws Exception { + Session session = createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(createDestination()); + producer.send(session.createTextMessage()); + } + + private void createBroker(boolean deleteAll) throws Exception { + broker = new BrokerService(); + setPersistenceAdapter(broker, persistenceAdapterChoice); + + broker.setDeleteAllMessagesOnStartup(deleteAll); + broker.start(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java new file mode 100644 index 0000000..6c3dc15 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsMultipleClientsTestSupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; + +@RunWith(BlockJUnit4ClassRunner.class) +public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport { + protected int messageCount = 1000; // 1000 Messages per producer + protected int prefetchCount = 10; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + durable = false; + topic = false; + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + } + + + @Test(timeout = 60 * 1000) + public void testManyProducersOneConsumer() throws Exception { + consumerCount = 1; + producerCount = 10; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 1000; + messageSize = 1024; // 1 Kb + configurePrefetchOfOne(); + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 1000; + prefetchCount = messageCount * 2; + messageSize = 1024; // 1 Kb + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 2 * 60 * 1000) + public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 10; + messageSize = 1024 * 1024 * 1; // 2 MB + configurePrefetchOfOne(); + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws Exception { + consumerCount = 2; + producerCount = 1; + messageCount = 10; + prefetchCount = messageCount * 2; + messageSize = 1024 * 1024 * 1; // 2 MB + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerManyConsumersFewMessages() throws Exception { + consumerCount = 50; + producerCount = 1; + messageCount = 10; + messageSize = 1; // 1 byte + prefetchCount = 10; + + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 60 * 1000) + public void testOneProducerManyConsumersManyMessages() throws Exception { + consumerCount = 50; + producerCount = 1; + messageCount = 1000; + messageSize = 1; // 1 byte + prefetchCount = messageCount / consumerCount; + allMessagesList.setMaximumDuration(allMessagesList.getMaximumDuration() * 20); + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + @Test(timeout = 2 * 60 * 1000) + public void testManyProducersManyConsumers() throws Exception { + consumerCount = 200; + producerCount = 50; + messageCount = 100; + messageSize = 1; // 1 byte + prefetchCount = 100; + allMessagesList.setMaximumDuration(allMessagesList.getMaximumDuration() * 20); + doMultipleClientsTest(); + + assertTotalMessagesReceived(messageCount * producerCount); + assertDestinationMemoryUsageGoesToZero(); + } + + protected void configurePrefetchOfOne() { + prefetchCount = 1; + + // this is gonna be a bit slow what with the low prefetch so bump up the + // wait time + allMessagesList.setMaximumDuration(allMessagesList.getMaximumDuration() * 20); + } + + public void doMultipleClientsTest() throws Exception { + // Create destination + final ActiveMQDestination dest = createDestination(); + + // Create consumers + ActiveMQConnectionFactory consumerFactory = (ActiveMQConnectionFactory)createConnectionFactory(); + consumerFactory.getPrefetchPolicy().setAll(prefetchCount); + + startConsumers(consumerFactory, dest); + + startProducers(dest, messageCount); + + // Wait for messages to be received. Make it proportional to the + // messages delivered. + int totalMessageCount = messageCount * producerCount; + if (dest.isTopic()) { + totalMessageCount *= consumerCount; + } + waitForAllMessagesToBeReceived(totalMessageCount); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java new file mode 100644 index 0000000..181a907 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; + +/** + * + * + */ +public class ReconnectWithJMXEnabledTest extends EmbeddedBrokerTestSupport { + + protected Connection connection; + protected boolean transacted; + protected int authMode = Session.AUTO_ACKNOWLEDGE; + + public void testTestUseConnectionCloseBrokerThenRestartInSameJVM() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + connection.close(); + + broker.stop(); + broker = createBroker(); + startBroker(); + + connectionFactory = createConnectionFactory(); + connection = connectionFactory.createConnection(); + useConnection(connection); + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + super.setUp(); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseJmx(true); + answer.setPersistent(isPersistent()); + answer.addConnector(bindAddress); + return answer; + } + + protected void useConnection(Connection connection) throws Exception { + connection.setClientID("foo"); + connection.start(); + Session session = connection.createSession(transacted, authMode); + Destination destination = createDestination(); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + Message message = session.createTextMessage("Hello World"); + producer.send(message); + Thread.sleep(1000); + consumer.close(); + } +}
