http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java new file mode 100644 index 0000000..2c41673 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java @@ -0,0 +1,1246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; +import javax.management.InstanceNotFoundException; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import junit.framework.Test; +import org.apache.activemq.broker.jmx.BrokerMBeanSupport; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean; +import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.command.*; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.JMXSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used to simulate the recovery that occurs when a broker shuts down. + * + * + */ +public class XARecoveryBrokerTest extends BrokerRestartTestSupport { + protected static final Logger LOG = LoggerFactory.getLogger(XARecoveryBrokerTest.class); + public boolean prioritySupport = false; + + public void testPreparedJmxView() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse)response; + assertEquals(4, dar.getData().length); + + // view prepared in kahadb view + if (broker.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString()); + String txFromView = kahadbView.getTransactions(); + LOG.info("Tx view fromm PA:" + txFromView); + assertTrue("xid with our dud format in transaction string " + txFromView, txFromView.contains("XID:[55,")); + } + + // restart the broker. + restartBroker(); + + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connection.send(connectionInfo); + + + response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + dar = (DataArrayResponse)response; + assertEquals(4, dar.getData().length); + + // validate destination depth via jmx + DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]); + assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize()); + + TransactionId first = (TransactionId)dar.getData()[0]; + int commitCount = 0; + // via jmx, force outcome + for (int i = 0; i < 4; i++) { + RecoveredXATransactionViewMBean mbean = getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]); + if (i%2==0) { + mbean.heuristicCommit(); + commitCount++; + } else { + mbean.heuristicRollback(); + } + } + + // verify all completed + response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + dar = (DataArrayResponse)response; + assertEquals(0, dar.getData().length); + + // verify messages available + assertEquals("enqueue count reflects outcome", commitCount, destinationView.getQueueSize()); + + // verify mbeans gone + try { + RecoveredXATransactionViewMBean gone = getProxyToPreparedTransactionViewMBean(first); + gone.heuristicRollback(); + fail("Excepted not found"); + } catch (InstanceNotFoundException expectedNotfound) { + } + } + + private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String name) throws MalformedObjectNameException, JMSException { + return (PersistenceAdapterViewMBean)broker.getManagementContext().newProxyInstance( + BrokerMBeanSupport.createPersistenceAdapterName(broker.getBrokerObjectName().toString(), name), + PersistenceAdapterViewMBean.class, true); + } + + private RecoveredXATransactionViewMBean getProxyToPreparedTransactionViewMBean(TransactionId xid) throws MalformedObjectNameException, JMSException { + + ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid=" + + JMXSupport.encodeObjectNamePart(xid.toString())); + RecoveredXATransactionViewMBean proxy = (RecoveredXATransactionViewMBean) broker.getManagementContext().newProxyInstance(objectName, + RecoveredXATransactionViewMBean.class, true); + return proxy; + } + + private DestinationViewMBean getProxyToDestination(ActiveMQDestination destination) throws MalformedObjectNameException, JMSException { + + final ObjectName objectName = new ObjectName("org.apache.activemq:type=Broker,brokerName="+broker.getBrokerName()+",destinationType=" + + JMXSupport.encodeObjectNamePart(destination.getDestinationTypeAsString()) + + ",destinationName=" + JMXSupport.encodeObjectNamePart(destination.getPhysicalName())); + + DestinationViewMBean proxy = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(objectName, + DestinationViewMBean.class, true); + return proxy; + + } + + public void testPreparedTransactionRecoveredOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse)response; + assertEquals(4, dar.getData().length); + + // ensure we can close a connection with prepared transactions + connection.request(closeConnectionInfo(connectionInfo)); + + // open again to deliver outcome + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Commit the prepared transactions. + for (int i = 0; i < dar.getData().length; i++) { + TransactionId transactionId = (TransactionId) dar.getData()[i]; + LOG.info("commit: " + transactionId); + connection.request(createCommitTransaction2Phase(connectionInfo, transactionId)); + } + + // We should get the committed transactions. + final int countToReceive = expectedMessageCount(4, destination); + for (int i = 0; i < countToReceive ; i++) { + Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got non null message: " + i, m); + } + + assertNoMessagesLeft(connection); + assertEmptyDLQ(); + } + + private void assertEmptyDLQ() throws Exception { + try { + DestinationViewMBean destinationView = getProxyToDestination(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME)); + assertEquals("nothing on dlq", 0, destinationView.getQueueSize()); + assertEquals("nothing added to dlq", 0, destinationView.getEnqueueCount()); + } catch (java.lang.reflect.UndeclaredThrowableException maybeOk) { + if (maybeOk.getUndeclaredThrowable() instanceof javax.management.InstanceNotFoundException) { + // perfect no dlq + } else { + throw maybeOk; + } + } + } + + public void testPreparedInterleavedTransactionRecoveredOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + // send non tx message + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.request(message); + + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // consume non transacted message, but don't ack + int countToReceive = expectedMessageCount(1, destination); + for (int i=0; i< countToReceive; i++) { + Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("got non tx message after prepared", m); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse)response; + assertEquals(4, dar.getData().length); + + // ensure we can close a connection with prepared transactions + connection.request(closeConnectionInfo(connectionInfo)); + + // open again to deliver outcome + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + + // Commit the prepared transactions. + for (int i = 0; i < dar.getData().length; i++) { + TransactionId transactionId = (TransactionId) dar.getData()[i]; + LOG.info("commit: " + transactionId); + connection.request(createCommitTransaction2Phase(connectionInfo, transactionId)); + } + + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // We should get the committed transactions and the non tx message + countToReceive = expectedMessageCount(5, destination); + for (int i = 0; i < countToReceive ; i++) { + Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); + LOG.info("received: " + m); + assertNotNull("Got non null message: " + i, m); + } + + assertNoMessagesLeft(connection); + assertEmptyDLQ(); + } + + public void testTopicPreparedTransactionRecoveredOnRestart() throws Exception { + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // Prepare 4 message sends. + for (int i = 0; i < 4; i++) { + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + + // Prepare + connection.send(createPrepareTransaction(connectionInfo, txid)); + } + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + connection.request(closeConnectionInfo(connectionInfo)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and try receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // Since prepared but not committed.. they should not get delivered. + assertNull(receiveMessage(connection)); + assertNoMessagesLeft(connection); + + Response response = connection.request(new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER)); + assertNotNull(response); + DataArrayResponse dar = (DataArrayResponse) response; + assertEquals(4, dar.getData().length); + + // ensure we can close a connection with prepared transactions + connection.request(closeConnectionInfo(connectionInfo)); + + // open again to deliver outcome + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // Commit the prepared transactions. + for (int i = 0; i < dar.getData().length; i++) { + connection.request(createCommitTransaction2Phase(connectionInfo, (TransactionId) dar.getData()[i])); + } + + // We should get the committed transactions. + for (int i = 0; i < expectedMessageCount(4, destination); i++) { + Message m = receiveMessage(connection, TimeUnit.SECONDS.toMillis(10)); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + + } + + public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + } + + // Commit + connection.send(createCommitTransaction1Phase(connectionInfo, txid)); + connection.request(closeConnectionInfo(connectionInfo)); + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + for (int i = 0; i < expectedMessageCount(4, destination); i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + } + + public void testQueuePersistentCommited2PhaseMessagesNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setTransactionId(txid); + connection.send(message); + } + + // Commit 2 phase + connection.request(createPrepareTransaction(connectionInfo, txid)); + connection.send(createCommitTransaction2Phase(connectionInfo, txid)); + + connection.request(closeConnectionInfo(connectionInfo)); + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + for (int i = 0; i < expectedMessageCount(4, destination); i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + } + + public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + ConsumerInfo consumerInfo; + Message m = null; + for (ActiveMQDestination dest : destinationList(destination)) { + // Setup the consumer and receive the message. + consumerInfo = createConsumerInfo(sessionInfo, dest); + connection.send(consumerInfo); + + for (int i = 0; i < 4; i++) { + m = receiveMessage(connection); + assertNotNull(m); + } + + MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + + // Commit + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // No messages should be delivered. + assertNoMessagesLeft(connection); + + m = receiveMessage(connection); + assertNull(m); + } + + public void testQueuePersistentPreparedAcksNotLostOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + ConsumerInfo consumerInfo; + Message m = null; + for (ActiveMQDestination dest : destinationList(destination)) { + // Setup the consumer and receive the message. + consumerInfo = createConsumerInfo(sessionInfo, dest); + connection.send(consumerInfo); + + for (int i = 0; i < 4; i++) { + m = receiveMessage(connection); + assertNotNull(m); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, m, 4, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // restart the broker. + restartBroker(); + + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // no redelivery, exactly once semantics unless there is rollback + m = receiveMessage(connection); + assertNull(m); + assertNoMessagesLeft(connection); + + // validate destination depth via jmx + DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]); + assertEquals("enqueue count does not see prepared acks", 4, destinationView.getQueueSize()); + assertEquals("enqueue count does not see prepared acks", 0, destinationView.getDequeueCount()); + + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + + assertEquals("enqueue count does not see commited acks", 0, destinationView.getQueueSize()); + assertEquals("enqueue count does not see commited acks", 4, destinationView.getDequeueCount()); + + } + + public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() { + addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testTopicPersistentPreparedAcksNotLostOnRestart() throws Exception { + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // setup durable subs + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + final int numMessages = 4; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + final int messageCount = expectedMessageCount(numMessages, destination); + Message m = null; + for (int i = 0; i < messageCount; i++) { + m = receiveMessage(connection); + assertNotNull("unexpected null on: " + i, m); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, m, messageCount, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // restart the broker. + restartBroker(); + + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // no redelivery, exactly once semantics unless there is rollback + m = receiveMessage(connection); + assertNull(m); + assertNoMessagesLeft(connection); + + connection.request(createCommitTransaction2Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + } + + public void testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + int numMessages = 4; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + ConsumerInfo consumerInfo; + Message message = null; + for (ActiveMQDestination dest : destinationList(destination)) { + // Setup the consumer and receive the message. + consumerInfo = createConsumerInfo(sessionInfo, dest); + connection.send(consumerInfo); + + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // restart the broker. + restartBroker(); + + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // no redelivery, exactly once semantics while prepared + message = receiveMessage(connection); + assertNull(message); + assertNoMessagesLeft(connection); + + // rollback so we get redelivery + connection.request(createRollbackTransaction(connectionInfo, txid)); + + LOG.info("new tx for redelivery"); + txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (ActiveMQDestination dest : destinationList(destination)) { + // Setup the consumer and receive the message. + consumerInfo = createConsumerInfo(sessionInfo, dest); + connection.send(consumerInfo); + + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull("unexpected null on:" + i, message); + } + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + + // Commit + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + } + + public void testQueuePersistentPreparedAcksAvailableAfterRollbackPrefetchOne() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + int numMessages = 1; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + final int messageCount = expectedMessageCount(numMessages, destination); + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + // use consumer per destination for the composite dest case + // bc the same composite dest is used for sending so there + // will be duplicate message ids in the mix which a single + // consumer (PrefetchSubscription) cannot handle in a tx + // atm. The matching is based on messageId rather than messageId + // and destination + Set<ConsumerInfo> consumerInfos = new HashSet<ConsumerInfo>(); + for (ActiveMQDestination dest : destinationList(destination)) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest); + consumerInfo.setPrefetchSize(numMessages); + consumerInfos.add(consumerInfo); + } + + for (ConsumerInfo info : consumerInfos) { + connection.send(info); + } + + Message message = null; + for (ConsumerInfo info : consumerInfos) { + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull(message); + connection.send(createAck(info, message, 1, MessageAck.DELIVERED_ACK_TYPE)); + } + MessageAck ack = createAck(info, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // reconnect + connection.send(connectionInfo.createRemoveCommand()); + connection = createConnection(); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse) connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + connection.send(sessionInfo); + + for (ConsumerInfo info : consumerInfos) { + connection.send(info); + } + + // no redelivery, exactly once semantics while prepared + message = receiveMessage(connection); + assertNull(message); + assertNoMessagesLeft(connection); + + // rollback so we get redelivery + connection.request(createRollbackTransaction(connectionInfo, txid)); + + LOG.info("new tx for redelivery"); + txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (ConsumerInfo info : consumerInfos) { + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull("unexpected null on:" + i, message); + MessageAck ack = createAck(info, message, 1, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + } + + // Commit + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse) connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + } + + public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() { + addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws Exception { + + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // setup durable subs + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + int numMessages = 4; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = null; + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // restart the broker. + restartBroker(); + + connection = createConnection(); + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + connection.send(connectionInfo); + + // validate recovery + TransactionInfo recoverInfo = new TransactionInfo(connectionInfo.getConnectionId(), null, TransactionInfo.RECOVER); + DataArrayResponse dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + + assertEquals("there is a prepared tx", 1, dataArrayResponse.getData().length); + assertEquals("it matches", txid, dataArrayResponse.getData()[0]); + + sessionInfo = createSessionInfo(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + // no redelivery, exactly once semantics while prepared + message = receiveMessage(connection); + assertNull(message); + assertNoMessagesLeft(connection); + + // rollback so we get redelivery + connection.request(createRollbackTransaction(connectionInfo, txid)); + + LOG.info("new tx for redelivery"); + txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull("unexpected null on:" + i, message); + } + ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + // Commit + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); + + // validate recovery complete + dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo); + assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length); + } + + public void initCombosForTestTopicPersistentPreparedAcksAvailableAfterRollback() { + addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws Exception { + + ActiveMQDestination destination = new ActiveMQTopic("TryTopic"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + // setup durable subs + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + int numMessages = 4; + for (int i = 0; i < numMessages; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = null; + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + + // one ack with last received, mimic a beforeEnd synchronization + MessageAck ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + connection.request(createPrepareTransaction(connectionInfo, txid)); + + // rollback so we get redelivery + connection.request(createRollbackTransaction(connectionInfo, txid)); + + LOG.info("new consumer/tx for redelivery"); + connection.request(closeConnectionInfo(connectionInfo)); + + connectionInfo = createConnectionInfo(); + connectionInfo.setClientId("durable"); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + + // setup durable subs + consumerInfo = createConsumerInfo(sessionInfo, destination); + consumerInfo.setSubscriptionName("durable"); + connection.send(consumerInfo); + + txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + for (int i = 0; i < numMessages; i++) { + message = receiveMessage(connection); + assertNotNull("unexpected null on:" + i, message); + } + ack = createAck(consumerInfo, message, numMessages, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + + // Commit + connection.request(createCommitTransaction1Phase(connectionInfo, txid)); + } + + private ActiveMQDestination[] destinationList(ActiveMQDestination dest) { + return dest.isComposite() ? dest.getCompositeDestinations() : new ActiveMQDestination[]{dest}; + } + + private int expectedMessageCount(int i, ActiveMQDestination destination) { + return i * (destination.isComposite() ? destination.getCompositeDestinations().length : 1); + } + + public void testQueuePersistentUncommittedAcksLostOnRestart() throws Exception { + + ActiveMQDestination destination = createDestination(); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < 4; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + + Message message = null; + for (ActiveMQDestination dest : destinationList(destination)) { + // Setup the consumer and receive the message. + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest); + connection.send(consumerInfo); + + for (int i = 0; i < 4; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + MessageAck ack = createAck(consumerInfo, message, 4, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.request(ack); + } + + // Don't commit + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + + for (ActiveMQDestination dest : destinationList(destination)) { + // Setup the consumer and receive the message. + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest); + connection.send(consumerInfo); + + for (int i = 0; i < 4; i++) { + message = receiveMessage(connection); + assertNotNull(message); + } + } + + assertNoMessagesLeft(connection); + } + + @Override + protected PolicyEntry getDefaultPolicy() { + PolicyEntry policyEntry = super.getDefaultPolicy(); + policyEntry.setPrioritizedMessages(prioritySupport); + return policyEntry; + } + + public static Test suite() { + return suite(XARecoveryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected ActiveMQDestination createDestination() { + return new ActiveMQQueue(getClass().getName() + "." + getName()); + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java new file mode 100644 index 0000000..c65dc53 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.advisory; + +import junit.framework.Test; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerTestSupport; +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.SessionInfo; + +public class AdvisoryBrokerTest extends BrokerTestSupport { + + public void testConnectionAdvisories() throws Exception { + + ActiveMQDestination destination = AdvisorySupport.getConnectionAdvisoryTopic(); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(100); + + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(consumerInfo1); + + // We should get an advisory of our own connection. + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ConnectionInfo)m1.getDataStructure()).getConnectionId(), connectionInfo1.getConnectionId()); + + // Setup a second connection + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + connection2.send(connectionInfo2); + + // We should get an advisory of the second connection. + m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ConnectionInfo)m1.getDataStructure()).getConnectionId(), connectionInfo2.getConnectionId()); + + // Close the second connection. + connection2.send(closeConnectionInfo(connectionInfo2)); + connection2.stop(); + + // We should get an advisory of the second connection closing + m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + RemoveInfo r = (RemoveInfo) m1.getDataStructure(); + assertEquals(r.getObjectId(), connectionInfo2.getConnectionId()); + + assertNoMessagesLeft(connection1); + } + + public void testConsumerAdvisories() throws Exception { + + ActiveMQDestination queue = new ActiveMQQueue("test"); + ActiveMQDestination destination = AdvisorySupport.getConsumerAdvisoryTopic(queue); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(100); + + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(consumerInfo1); + + // We should not see and advisory for the advisory consumer. + assertNoMessagesLeft(connection1); + + // Setup a second consumer. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue); + consumerInfo1.setPrefetchSize(100); + + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.send(consumerInfo2); + + // We should get an advisory of the new consumer. + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ConsumerInfo)m1.getDataStructure()).getConsumerId(), consumerInfo2.getConsumerId()); + + // Close the second connection. + connection2.request(closeConnectionInfo(connectionInfo2)); + connection2.stop(); + + // We should get an advisory of the consumer closing + m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + RemoveInfo r = (RemoveInfo) m1.getDataStructure(); + assertEquals(r.getObjectId(), consumerInfo2.getConsumerId()); + + assertNoMessagesLeft(connection2); + } + + public void testConsumerAdvisoriesReplayed() throws Exception { + + ActiveMQDestination queue = new ActiveMQQueue("test"); + ActiveMQDestination destination = AdvisorySupport.getConsumerAdvisoryTopic(queue); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + + // Setup a second consumer. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue); + consumerInfo2.setPrefetchSize(100); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.send(consumerInfo2); + + // We should get an advisory of the previous consumer. + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(100); + connection1.send(consumerInfo1); + + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ConsumerInfo)m1.getDataStructure()).getConsumerId(), consumerInfo2.getConsumerId()); + + // Close the second connection. + connection2.request(closeConnectionInfo(connectionInfo2)); + connection2.stop(); + + // We should get an advisory of the consumer closing + m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + RemoveInfo r = (RemoveInfo) m1.getDataStructure(); + assertEquals(r.getObjectId(), consumerInfo2.getConsumerId()); + + assertNoMessagesLeft(connection2); + } + + + public void testProducerAdvisories() throws Exception { + + ActiveMQDestination queue = new ActiveMQQueue("test"); + ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(100); + + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(consumerInfo1); + + assertNoMessagesLeft(connection1); + + // Setup a producer. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + producerInfo2.setDestination(queue); + + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.send(producerInfo2); + + // We should get an advisory of the new produver. + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); + + // Close the second connection. + connection2.request(closeConnectionInfo(connectionInfo2)); + connection2.stop(); + + // We should get an advisory of the producer closing + m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + RemoveInfo r = (RemoveInfo) m1.getDataStructure(); + assertEquals(r.getObjectId(), producerInfo2.getProducerId()); + + assertNoMessagesLeft(connection2); + } + + public void testProducerAdvisoriesReplayed() throws Exception { + + ActiveMQDestination queue = new ActiveMQQueue("test"); + ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + + // Setup a producer. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + producerInfo2.setDestination(queue); + + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.send(producerInfo2); + + // Create the advisory consumer.. it should see the previous producer + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(100); + connection1.send(consumerInfo1); + + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); + + // Close the second connection. + connection2.request(closeConnectionInfo(connectionInfo2)); + connection2.stop(); + + // We should get an advisory of the producer closing + m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + RemoveInfo r = (RemoveInfo) m1.getDataStructure(); + assertEquals(r.getObjectId(), producerInfo2.getProducerId()); + + assertNoMessagesLeft(connection2); + } + + public void testProducerAdvisoriesReplayedOnlyTargetNewConsumer() throws Exception { + + ActiveMQDestination queue = new ActiveMQQueue("test"); + ActiveMQDestination destination = AdvisorySupport.getProducerAdvisoryTopic(queue); + + // Setup a first connection + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + // Create the first consumer.. + ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination); + consumerInfo1.setPrefetchSize(100); + connection1.send(consumerInfo1); + + // Setup a producer. + StubConnection connection2 = createConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2); + producerInfo2.setDestination(queue); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + connection2.send(producerInfo2); + + Message m1 = receiveMessage(connection1); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); + + // Create the 2nd consumer.. + ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination); + consumerInfo2.setPrefetchSize(100); + connection2.send(consumerInfo2); + + // The second consumer should se a replay + m1 = receiveMessage(connection2); + assertNotNull(m1); + assertNotNull(m1.getDataStructure()); + assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), producerInfo2.getProducerId()); + + // But the first consumer should not see the replay. + assertNoMessagesLeft(connection1); + } + + public static Test suite() { + return suite(AdvisoryBrokerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java new file mode 100644 index 0000000..e791fbe --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.advisory; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; + +import java.net.URI; + +public class AdvisoryDuplexNetworkBridgeTest extends AdvisoryNetworkBridgeTest { + + @Override + public void createBroker1() throws Exception { + broker1 = new BrokerService(); + broker1.setBrokerName("broker1"); + broker1.addConnector("tcp://localhost:61617"); + broker1.setUseJmx(false); + broker1.setPersistent(false); + broker1.start(); + broker1.waitUntilStarted(); + } + + @Override + public void createBroker2() throws Exception { + broker2 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/duplexLocalBroker.xml")); + broker2.start(); + broker2.waitUntilStarted(); + } + + public void assertCreatedByDuplex(boolean createdByDuplex) { + assertTrue(createdByDuplex); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryJmxTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryJmxTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryJmxTest.java new file mode 100644 index 0000000..d452bed --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryJmxTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.advisory; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.DestinationInfo; + +import javax.jms.*; +import javax.management.MBeanServerConnection; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +public class AdvisoryJmxTest extends EmbeddedBrokerTestSupport { + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(isPersistent()); + answer.addConnector(bindAddress); + ManagementContext context = new ManagementContext(); + context.setConnectorPort(1199); + answer.setManagementContext(context); + return answer; + } + + public void testCreateDeleteDestinations() throws Exception { + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1199/jmxrmi"); + JMXConnector connector = JMXConnectorFactory.connect(url, null); + connector.connect(); + MBeanServerConnection connection = connector.getMBeanServerConnection(); + ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); + BrokerViewMBean brokerMbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true); + Connection conn = createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = sess.createConsumer(sess.createTopic("ActiveMQ.Advisory.Queue")); + conn.start(); + Destination dest = sess.createQueue("test"); + + brokerMbean.addQueue("test"); + + ActiveMQMessage msg = (ActiveMQMessage)consumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getDataStructure() instanceof DestinationInfo); + assertEquals(((DestinationInfo)msg.getDataStructure()).getDestination(), dest); + assertEquals(((DestinationInfo)msg.getDataStructure()).getOperationType(), 0); + + brokerMbean.removeQueue("test"); + + msg = (ActiveMQMessage)consumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getDataStructure() instanceof DestinationInfo); + assertEquals(((DestinationInfo)msg.getDataStructure()).getDestination(), dest); + assertEquals(((DestinationInfo)msg.getDataStructure()).getOperationType(), 1); + + + brokerMbean.addQueue("test"); + msg = (ActiveMQMessage)consumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getDataStructure() instanceof DestinationInfo); + assertEquals(((DestinationInfo)msg.getDataStructure()).getDestination(), dest); + assertEquals(((DestinationInfo)msg.getDataStructure()).getOperationType(), 0); + assertEquals(((DestinationInfo)msg.getDataStructure()).getOperationType(), 0); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java new file mode 100644 index 0000000..da9cc6d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.advisory; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.BrokerInfo; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.net.URI; + +public class AdvisoryNetworkBridgeTest extends TestCase { + + BrokerService broker1; + BrokerService broker2; + + + public void testAdvisory() throws Exception { + createBroker1(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1"); + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic()); + + Thread.sleep(1000); + + createBroker2(); + + ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000); + assertNotNull(advisory); + assertTrue(advisory.getDataStructure() instanceof BrokerInfo); + assertTrue(advisory.getBooleanProperty("started")); + assertCreatedByDuplex(advisory.getBooleanProperty("createdByDuplex")); + + broker2.stop(); + broker2.waitUntilStopped(); + + advisory = (ActiveMQMessage)consumer.receive(2000); + assertNotNull(advisory); + assertTrue(advisory.getDataStructure() instanceof BrokerInfo); + assertFalse(advisory.getBooleanProperty("started")); + + conn.close(); + } + + public void testAddConsumerLater() throws Exception { + createBroker1(); + + createBroker2(); + + Thread.sleep(1000); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1"); + Connection conn = factory.createConnection(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn.start(); + MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic()); + + ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000); + assertNotNull(advisory); + assertTrue(advisory.getDataStructure() instanceof BrokerInfo); + assertTrue(advisory.getBooleanProperty("started")); + assertCreatedByDuplex(advisory.getBooleanProperty("createdByDuplex")); + + broker2.stop(); + broker2.waitUntilStopped(); + + advisory = (ActiveMQMessage)consumer.receive(2000); + assertNotNull(advisory); + assertTrue(advisory.getDataStructure() instanceof BrokerInfo); + assertFalse(advisory.getBooleanProperty("started")); + + consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic()); + advisory = (ActiveMQMessage)consumer.receive(1000); + assertNull(advisory); + + conn.close(); + + } + + public void assertCreatedByDuplex(boolean createdByDuplex) { + assertFalse(createdByDuplex); + } + + public void createBroker1() throws Exception { + broker1 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml")); + broker1.start(); + broker1.waitUntilStarted(); + } + + public void createBroker2() throws Exception { + broker2 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml")); + broker2.start(); + broker2.waitUntilStarted(); + } + + + @Override + protected void tearDown() throws Exception { + broker1.stop(); + broker1.waitUntilStopped(); + + broker2.stop(); + broker2.waitUntilStopped(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/destinations-on-start.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/destinations-on-start.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/destinations-on-start.xml new file mode 100644 index 0000000..f0ee482 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/destinations-on-start.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker xmlns="http://activemq.apache.org/schema/core"> + <destinations> + <queue physicalName="FOO.BAR" /> + <topic physicalName="SOME.TOPIC" /> + </destinations> + + </broker> + +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/exclusive-consumer-startup-destination.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/exclusive-consumer-startup-destination.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/exclusive-consumer-startup-destination.xml new file mode 100644 index 0000000..5c10b7d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/exclusive-consumer-startup-destination.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker xmlns="http://activemq.apache.org/schema/core"> + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry queue="TEST.>" allConsumersExclusiveByDefault="true"/> + </policyEntries> + </policyMap> + </destinationPolicy> + <destinations> + <queue physicalName="TEST.QUEUE1"/> + <queue physicalName="TEST.QUEUE2"/> + <queue physicalName="TEST.QUEUE3"/> + </destinations> + </broker> + +</beans> +<!-- END SNIPPET: xbean -->
