http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 97cd6f6..9729793 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -32,6 +32,7 @@ import junit.framework.TestCase; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.SubscriptionStatistics; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; @@ -53,12 +54,13 @@ import org.slf4j.LoggerFactory; /** * @author gtully * @see https://issues.apache.org/activemq/browse/AMQ-2020 - */ + **/ public class QueueDuplicatesFromStoreTest extends TestCase { + private static final Logger LOG = LoggerFactory + .getLogger(QueueDuplicatesFromStoreTest.class); - private static final Logger LOG = LoggerFactory.getLogger(QueueDuplicatesFromStoreTest.class); - - ActiveMQQueue destination = new ActiveMQQueue("queue-" + QueueDuplicatesFromStoreTest.class.getSimpleName()); + ActiveMQQueue destination = new ActiveMQQueue("queue-" + + QueueDuplicatesFromStoreTest.class.getSimpleName()); BrokerService brokerService; final static String mesageIdRoot = "11111:22222:"; @@ -89,7 +91,7 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception { - doTestNoDuplicateAfterCacheFullAndAcked(1024 * 10); + doTestNoDuplicateAfterCacheFullAndAcked(1024*10); } public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception { @@ -97,13 +99,15 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception { - final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter(); - final MessageStore queueMessageStore = persistenceAdapter.createQueueMessageStore(destination); + final PersistenceAdapter persistenceAdapter = brokerService.getPersistenceAdapter(); + final MessageStore queueMessageStore = + persistenceAdapter.createQueueMessageStore(destination); final ConnectionContext contextNotInTx = new ConnectionContext(); final ConsumerInfo consumerInfo = new ConsumerInfo(); final DestinationStatistics destinationStatistics = new DestinationStatistics(); consumerInfo.setExclusive(true); - final Queue queue = new Queue(brokerService, destination, queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory()); + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, brokerService.getTaskRunnerFactory()); // a workaround for this issue // queue.setUseCache(false); @@ -134,34 +138,38 @@ public class QueueDuplicatesFromStoreTest extends TestCase { // pull from store in small windows Subscription subscription = new Subscription() { + private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics(); + @Override public void add(MessageReference node) throws Exception { if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) { - errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: " + node.getMessageId().getProducerSequenceId()); + errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: " + + node.getMessageId().getProducerSequenceId()); } - assertEquals("is in order", enqueueCounter.get(), node.getMessageId().getProducerSequenceId()); + assertEquals("is in order", enqueueCounter.get(), node + .getMessageId().getProducerSequenceId()); receivedLatch.countDown(); enqueueCounter.incrementAndGet(); node.decrementReferenceCount(); } @Override - public void add(ConnectionContext context, Destination destination) throws Exception { + public void add(ConnectionContext context, Destination destination) + throws Exception { } @Override public int countBeforeFull() { if (isFull()) { return 0; - } - else { + } else { return fullWindow - (int) (enqueueCounter.get() - ackedCount.get()); } } @Override public void destroy() { - } + }; @Override public void gc() { @@ -253,7 +261,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override - public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { + public boolean matches(MessageReference node, + MessageEvaluationContext context) throws IOException { return true; } @@ -263,11 +272,13 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override - public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception { + public void processMessageDispatchNotification( + MessageDispatchNotification mdn) throws Exception { } @Override - public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { + public Response pullMessage(ConnectionContext context, + MessagePull pull) throws Exception { return null; } @@ -277,7 +288,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override - public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { + public List<MessageReference> remove(ConnectionContext context, + Destination destination) throws Exception { return null; } @@ -286,7 +298,9 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override - public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException { + public void setSelector(String selector) + throws InvalidSelectorException, + UnsupportedOperationException { } @Override @@ -294,7 +308,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override - public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception { + public boolean addRecoveredMessage(ConnectionContext context, + MessageReference message) throws Exception { return false; } @@ -304,16 +319,18 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override - public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { + public void acknowledge(ConnectionContext context, MessageAck ack) + throws Exception { } @Override - public int getCursorMemoryHighWaterMark() { + public int getCursorMemoryHighWaterMark(){ return 0; } @Override - public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + public void setCursorMemoryHighWaterMark( + int cursorMemoryHighWaterMark) { } @Override @@ -336,14 +353,24 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override - public void incrementConsumedCount() { + public void incrementConsumedCount(){ } @Override - public void resetConsumedCount() { + public void resetConsumedCount(){ } + + @Override + public SubscriptionStatistics getSubscriptionStatistics() { + return subscriptionStatistics; + } + + @Override + public long getInFlightMessageSize() { + return subscriptionStatistics.getInflightMessageSize().getTotalSize(); + } }; queue.addSubscription(contextNotInTx, subscription); @@ -356,9 +383,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase { for (int j = 0; j < ackBatchSize; j++, removeIndex++) { ackedCount.incrementAndGet(); MessageAck ack = new MessageAck(); - ack.setLastMessageId(new MessageId(mesageIdRoot + removeIndex)); + ack.setLastMessageId(new MessageId(mesageIdRoot + + removeIndex)); ack.setMessageCount(1); - queue.removeMessage(contextNotInTx, subscription, new IndirectMessageReference(getMessage(removeIndex)), ack); + queue.removeMessage(contextNotInTx, subscription, + new IndirectMessageReference( + getMessage(removeIndex)), ack); queue.wakeup(); } @@ -373,7 +403,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase { assertTrue("There are no errors: " + errors, errors.isEmpty()); assertEquals(count, enqueueCounter.get()); - assertEquals("store count is correct", count - removeIndex, queueMessageStore.getMessageCount()); + assertEquals("store count is correct", count - removeIndex, + queueMessageStore.getMessageCount()); } private Message getMessage(int i) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java index b38a965..c9d0339 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java @@ -61,7 +61,6 @@ import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageStore; import org.apache.activemq.thread.TaskRunnerFactory; - import junit.framework.TestCase; public class SubscriptionAddRemoveQueueTest extends TestCase { @@ -177,16 +176,20 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner { - List<MessageReference> dispatched = Collections.synchronizedList(new ArrayList<MessageReference>()); + private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics(); + List<MessageReference> dispatched = + Collections.synchronizedList(new ArrayList<MessageReference>()); + @Override - public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { + public void acknowledge(ConnectionContext context, MessageAck ack) + throws Exception { } @Override public void add(MessageReference node) throws Exception { // immediate dispatch - QueueMessageReference qmr = (QueueMessageReference) node; + QueueMessageReference qmr = (QueueMessageReference)node; qmr.lock(this); dispatched.add(qmr); } @@ -400,5 +403,15 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { return 10; } + @Override + public SubscriptionStatistics getSubscriptionStatistics() { + return subscriptionStatistics; + } + + @Override + public long getInFlightMessageSize() { + return subscriptionStatistics.getInflightMessageSize().getTotalSize(); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java deleted file mode 100644 index ab388f0..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java +++ /dev/null @@ -1,432 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.region.cursors; - -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy; -import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.StoreUsage; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.usage.TempUsage; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Modified CursorSupport Unit test to reproduce the negative queue issue. - * - * Keys to reproducing: - * 1) Consecutive queues with listener on first sending to second queue - * 2) Push each queue to the memory limit - * This seems to help reproduce the issue more consistently, but - * we have seen times in our production environment where the - * negative queue can occur without. Our memory limits are - * very high in production and it still happens in varying - * frequency. - * 3) Prefetch - * Lowering the prefetch down to 10 and below seems to help - * reduce occurrences. - * 4) # of consumers per queue - * The issue occurs less with fewer consumers - * - * Things that do not affect reproduction: - * 1) Spring - we use spring in our production applications, but this test case works - * with or without it. - * 2) transacted - */ -public class NegativeQueueTest extends AutoFailTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class); - - public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS"); - - private static final String QUEUE_1_NAME = "conn.test.queue.1"; - private static final String QUEUE_2_NAME = "conn.test.queue.2"; - - private static final long QUEUE_MEMORY_LIMIT = 2097152; - private static final long MEMORY_USAGE = 400000000; - private static final long TEMP_USAGE = 200000000; - private static final long STORE_USAGE = 1000000000; - // ensure we exceed the cache 70% - private static final int MESSAGE_COUNT = 2100; - - protected static final boolean TRANSACTED = true; - protected static final boolean DEBUG = true; - protected static int NUM_CONSUMERS = 20; - protected static int PREFETCH_SIZE = 1000; - - protected BrokerService broker; - protected String bindAddress = "tcp://localhost:0"; - - public void testWithDefaultPrefetch() throws Exception { - PREFETCH_SIZE = 1000; - NUM_CONSUMERS = 20; - blastAndConsume(); - } - - public void x_testWithDefaultPrefetchFiveConsumers() throws Exception { - PREFETCH_SIZE = 1000; - NUM_CONSUMERS = 5; - blastAndConsume(); - } - - public void x_testWithDefaultPrefetchTwoConsumers() throws Exception { - PREFETCH_SIZE = 1000; - NUM_CONSUMERS = 2; - blastAndConsume(); - } - - public void testWithDefaultPrefetchOneConsumer() throws Exception { - PREFETCH_SIZE = 1000; - NUM_CONSUMERS = 1; - blastAndConsume(); - } - - public void testWithMediumPrefetch() throws Exception { - PREFETCH_SIZE = 50; - NUM_CONSUMERS = 20; - blastAndConsume(); - } - - public void x_testWithSmallPrefetch() throws Exception { - PREFETCH_SIZE = 10; - NUM_CONSUMERS = 20; - blastAndConsume(); - } - - public void testWithNoPrefetch() throws Exception { - PREFETCH_SIZE = 1; - NUM_CONSUMERS = 20; - blastAndConsume(); - } - - public void blastAndConsume() throws Exception { - LOG.info(getName()); - ConnectionFactory factory = createConnectionFactory(); - - //get proxy queues for statistics lookups - Connection proxyConnection = factory.createConnection(); - proxyConnection.start(); - Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME)); - final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME)); - - // LOAD THE QUEUE - Connection producerConnection = factory.createConnection(); - producerConnection.start(); - Session session = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); - Destination queue = session.createQueue(QUEUE_1_NAME); - MessageProducer producer = session.createProducer(queue); - List<TextMessage> senderList = new ArrayList<>(); - for (int i = 0; i < MESSAGE_COUNT; i++) { - TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date())); - senderList.add(msg); - producer.send(msg); - if (TRANSACTED) - session.commit(); - if (DEBUG && i % 100 == 0) { - int index = (i / 100) + 1; - System.out.print(index - ((index / 10) * 10)); - } - } - - //get access to the Queue info - if (DEBUG) { - System.out.println(""); - System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize()); - System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage()); - System.out.println("Queue1 Memory Available = " + proxyQueue1.getMemoryLimit()); - } - - // FLUSH THE QUEUE - final CountDownLatch latch1 = new CountDownLatch(1); - final CountDownLatch latch2 = new CountDownLatch(1); - Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS]; - List<Message> consumerList1 = new ArrayList<>(); - Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS]; - Connection[] producerConnections2 = new Connection[NUM_CONSUMERS]; - List<Message> consumerList2 = new ArrayList<>(); - - for (int ix = 0; ix < NUM_CONSUMERS; ix++) { - producerConnections2[ix] = factory.createConnection(); - producerConnections2[ix].start(); - consumerConnections1[ix] = getConsumerConnection(factory); - Session consumerSession = consumerConnections1[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME)); - consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1)); - } - - latch1.await(200000, TimeUnit.MILLISECONDS); - if (DEBUG) { - System.out.println(""); - System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize()); - System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage()); - System.out.println("Queue2 Memory Available = " + proxyQueue2.getMemoryLimit()); - } - - for (int ix = 0; ix < NUM_CONSUMERS; ix++) { - consumerConnections2[ix] = getConsumerConnection(factory); - Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME)); - consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2)); - } - - boolean success = Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - boolean done = latch2.await(10, TimeUnit.SECONDS); - if (DEBUG) { - System.out.println(""); - System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize()); - System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage()); - System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize()); - System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage()); - System.out.println("Queue2 Memory Available = " + proxyQueue2.getMemoryLimit()); - } - return done; - } - }, 300 * 1000); - if (!success) { - dumpAllThreads("blocked waiting on 2"); - } - assertTrue("got all expected messages on 2", success); - - producerConnection.close(); - for (int ix = 0; ix < NUM_CONSUMERS; ix++) { - consumerConnections1[ix].close(); - consumerConnections2[ix].close(); - producerConnections2[ix].close(); - } - - //let the consumer statistics on queue2 have time to update - Thread.sleep(500); - - if (DEBUG) { - System.out.println(""); - System.out.println("Queue1 Size = " + proxyQueue1.getQueueSize()); - System.out.println("Queue1 Memory % Used = " + proxyQueue1.getMemoryPercentUsage()); - System.out.println("Queue2 Size = " + proxyQueue2.getQueueSize()); - System.out.println("Queue2 Memory % Used = " + proxyQueue2.getMemoryPercentUsage()); - } - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return 0 == proxyQueue1.getQueueSize(); - } - }); - assertEquals("Queue1 has gone negative,", 0, proxyQueue1.getQueueSize()); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return 0 == proxyQueue2.getQueueSize(); - } - }); - assertEquals("Queue2 has gone negative,", 0, proxyQueue2.getQueueSize()); - proxyConnection.close(); - - } - - private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException { - final String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="; - - ObjectName queueViewMBeanName = new ObjectName(prefix + queue.getQueueName()); - QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); - - return proxy; - } - - protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException { - Connection connection = fac.createConnection(); - connection.start(); - return connection; - } - - @Override - protected void setUp() throws Exception { - if (broker == null) { - broker = createBroker(); - } - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress); - Properties props = new Properties(); - props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE); - props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE); - props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE); - cf.setProperties(props); - return cf; - } - - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - configureBroker(answer); - answer.start(); - answer.waitUntilStarted(); - bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString(); - return answer; - } - - protected void configureBroker(BrokerService answer) throws Exception { - PolicyEntry policy = new PolicyEntry(); - policy.setMemoryLimit(QUEUE_MEMORY_LIMIT); - policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy()); - - // disable the cache to be sure setBatch is the problem - // will get lots of duplicates - // real problem is sync between cursor and store add - leads to out or order messages - // in the cursor so setBatch can break. - // policy.setUseCache(false); - - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - answer.setDestinationPolicy(pMap); - answer.setDeleteAllMessagesOnStartup(true); - answer.addConnector("tcp://localhost:0"); - - MemoryUsage memoryUsage = new MemoryUsage(); - memoryUsage.setLimit(MEMORY_USAGE); - memoryUsage.setPercentUsageMinDelta(20); - - TempUsage tempUsage = new TempUsage(); - tempUsage.setLimit(TEMP_USAGE); - - StoreUsage storeUsage = new StoreUsage(); - storeUsage.setLimit(STORE_USAGE); - - SystemUsage systemUsage = new SystemUsage(); - systemUsage.setMemoryUsage(memoryUsage); - systemUsage.setTempUsage(tempUsage); - systemUsage.setStoreUsage(storeUsage); - answer.setSystemUsage(systemUsage); - } - - /** - * Message listener that is given the Session for transacted consumers - */ - class SessionAwareMessageListener implements MessageListener { - - private final List<Message> consumerList; - private final CountDownLatch latch; - private final Session consumerSession; - private Session producerSession; - private MessageProducer producer; - - public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList) { - this(null, consumerSession, null, latch, consumerList); - } - - public SessionAwareMessageListener(Connection producerConnection, - Session consumerSession, - String outQueueName, - CountDownLatch latch, - List<Message> consumerList) { - this.consumerList = consumerList; - this.latch = latch; - this.consumerSession = consumerSession; - - if (producerConnection != null) { - try { - producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); - Destination queue = producerSession.createQueue(outQueueName); - producer = producerSession.createProducer(queue); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - } - - @Override - public void onMessage(Message msg) { - try { - if (producer == null) { - // sleep to act as a slow consumer - // which will force a mix of direct and polled dispatching - // using the cursor on the broker - Thread.sleep(50); - } - else { - producer.send(msg); - if (TRANSACTED) - producerSession.commit(); - } - } - catch (Exception e) { - e.printStackTrace(); - } - - synchronized (consumerList) { - consumerList.add(msg); - if (DEBUG && consumerList.size() % 100 == 0) { - int index = consumerList.size() / 100; - System.out.print(index - ((index / 10) * 10)); - } - if (consumerList.size() == MESSAGE_COUNT) { - latch.countDown(); - } - } - if (TRANSACTED) { - try { - consumerSession.commit(); - } - catch (JMSException e) { - e.printStackTrace(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java deleted file mode 100644 index 2d8adb5..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import java.net.URI; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.spring.ConsumerBean; -import org.apache.activemq.xbean.XBeanBrokerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - * - */ -public class CompositeQueueTest extends EmbeddedBrokerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class); - - protected int total = 10; - protected Connection connection; - public String messageSelector1, messageSelector2 = null; - - public void testVirtualTopicCreation() throws Exception { - if (connection == null) { - connection = createConnection(); - } - connection.start(); - - ConsumerBean messageList1 = new ConsumerBean(); - ConsumerBean messageList2 = new ConsumerBean(); - messageList1.setVerbose(true); - messageList2.setVerbose(true); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Destination producerDestination = getProducerDestination(); - Destination destination1 = getConsumer1Dsetination(); - Destination destination2 = getConsumer2Dsetination(); - - LOG.info("Sending to: " + producerDestination); - LOG.info("Consuming from: " + destination1 + " and " + destination2); - - MessageConsumer c1 = session.createConsumer(destination1, messageSelector1); - MessageConsumer c2 = session.createConsumer(destination2, messageSelector2); - - c1.setMessageListener(messageList1); - c2.setMessageListener(messageList2); - - // create topic producer - MessageProducer producer = session.createProducer(producerDestination); - assertNotNull(producer); - - for (int i = 0; i < total; i++) { - producer.send(createMessage(session, i)); - } - - assertMessagesArrived(messageList1, messageList2); - } - - protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { - messageList1.assertMessagesArrived(total); - messageList2.assertMessagesArrived(total); - } - - protected TextMessage createMessage(Session session, int i) throws JMSException { - TextMessage textMessage = session.createTextMessage("message: " + i); - if (i % 2 != 0) { - textMessage.setStringProperty("odd", "yes"); - } - else { - textMessage.setStringProperty("odd", "no"); - } - textMessage.setIntProperty("i", i); - return textMessage; - } - - protected Destination getConsumer1Dsetination() { - return new ActiveMQQueue("FOO"); - } - - protected Destination getConsumer2Dsetination() { - return new ActiveMQTopic("BAR"); - } - - protected Destination getProducerDestination() { - return new ActiveMQQueue("MY.QUEUE"); - } - - @Override - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - } - super.tearDown(); - } - - @Override - protected BrokerService createBroker() throws Exception { - XBeanBrokerFactory factory = new XBeanBrokerFactory(); - BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); - return answer; - } - - protected String getBrokerConfigUri() { - return "org/apache/activemq/broker/virtual/composite-queue.xml"; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java deleted file mode 100644 index 9ada103..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeTopicTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import javax.jms.Destination; - -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; - -/** - * - * - */ -public class CompositeTopicTest extends CompositeQueueTest { - - @Override - protected Destination getConsumer1Dsetination() { - return new ActiveMQQueue("FOO"); - } - - @Override - protected Destination getConsumer2Dsetination() { - return new ActiveMQTopic("BAR"); - } - - @Override - protected Destination getProducerDestination() { - return new ActiveMQTopic("MY.TOPIC"); - } - - @Override - protected String getBrokerConfigUri() { - return "org/apache/activemq/broker/virtual/composite-topic.xml"; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java deleted file mode 100644 index 39e9d2a..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/DestinationInterceptorDurableSubTest.java +++ /dev/null @@ -1,283 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import java.io.IOException; -import java.net.URI; - -import javax.jms.Connection; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicSubscriber; -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServerConnection; -import javax.management.ObjectInstance; -import javax.management.ObjectName; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; - -import org.apache.activemq.EmbeddedBrokerTestSupport; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DestinationFilter; -import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.Message; -import org.apache.activemq.xbean.XBeanBrokerFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test for AMQ-4571. - * checks that durable subscription is fully unregistered - * when using nested destination interceptors. - */ -public class DestinationInterceptorDurableSubTest extends EmbeddedBrokerTestSupport { - - private static final transient Logger LOG = LoggerFactory.getLogger(DestinationInterceptorDurableSubTest.class); - private MBeanServerConnection mbsc = null; - public static final String JMX_CONTEXT_BASE_NAME = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="; - - /** - * Tests AMQ-4571. - * - * @throws Exception - */ - public void testVirtualTopicRemoval() throws Exception { - - LOG.debug("Running testVirtualTopicRemoval()"); - String clientId1 = "myId1"; - String clientId2 = "myId2"; - - Connection conn = null; - Session session = null; - - try { - assertTrue(broker.isStarted()); - - // create durable sub 1 - conn = createConnection(); - conn.setClientID(clientId1); - conn.start(); - session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - // Topic topic = session.createTopic(destination.getPhysicalName()); - TopicSubscriber sub1 = session.createDurableSubscriber((Topic) destination, clientId1); - - // create durable sub 2 - TopicSubscriber sub2 = session.createDurableSubscriber((Topic) destination, clientId2); - - // verify two subs registered in JMX - assertSubscriptionCount(destination.getPhysicalName(), 2); - assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1)); - assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2)); - - // delete sub 1 - sub1.close(); - session.unsubscribe(clientId1); - - // verify only one sub registered in JMX - assertSubscriptionCount(destination.getPhysicalName(), 1); - assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1)); - assertTrue(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2)); - - // delete sub 2 - sub2.close(); - session.unsubscribe(clientId2); - - // verify no sub registered in JMX - assertSubscriptionCount(destination.getPhysicalName(), 0); - assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId1)); - assertFalse(isSubRegisteredInJmx(destination.getPhysicalName(), clientId2)); - } - finally { - session.close(); - conn.close(); - } - } - - /** - * Connects to broker using JMX - * - * @return The JMX connection - * @throws IOException in case of any errors - */ - protected MBeanServerConnection connectJMXBroker() throws IOException { - // connect to broker via JMX - JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:1299/jmxrmi"); - JMXConnector jmxc = JMXConnectorFactory.connect(url, null); - MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); - LOG.debug("JMX connection established"); - return mbsc; - } - - /** - * Asserts that the Subscriptions JMX attribute of a topic has the expected - * count. - * - * @param topicName name of the topic destination - * @param expectedCount expected number of subscriptions - * @return - */ - protected boolean assertSubscriptionCount(String topicName, int expectedCount) { - try { - if (mbsc == null) { - mbsc = connectJMXBroker(); - } - // query broker queue size - ObjectName[] tmp = (ObjectName[]) mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions"); - assertEquals(expectedCount, tmp.length); - } - catch (Exception ex) { - LOG.error(ex.getMessage()); - return false; - } - return true; - } - - /** - * Checks if a subscriptions for topic topicName with subName is registered in JMX - * - * @param topicName physical name of topic destination (excluding prefix 'topic://') - * @param subName name of the durable subscription - * @return true if registered, false otherwise - */ - protected boolean isSubRegisteredInJmx(String topicName, String subName) { - - try { - if (mbsc == null) { - mbsc = connectJMXBroker(); - } - - // A durable sub is registered under the Subscriptions JMX attribute of the topic and - // as its own ObjectInstance under the topic's Consumer namespace. - // AMQ-4571 only removed the latter not the former on unsubscribe(), so we need - // to check against both. - ObjectName[] names = (ObjectName[]) mbsc.getAttribute(new ObjectName(JMX_CONTEXT_BASE_NAME + topicName), "Subscriptions"); - ObjectInstance instance = mbsc.getObjectInstance(new ObjectName(JMX_CONTEXT_BASE_NAME + - topicName + - ",endpoint=Consumer,clientId=myId1,consumerId=Durable(myId1_" + - subName + - ")")); - - if (instance == null) - return false; - - for (int i = 0; i < names.length; i++) { - if (names[i].toString().contains(subName)) - return true; - } - } - catch (InstanceNotFoundException ine) { - //this may be expected so log at info level - LOG.info(ine.toString()); - return false; - } - catch (Exception ex) { - LOG.error(ex.toString()); - return false; - } - return false; - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - } - - @Override - protected BrokerService createBroker() throws Exception { - XBeanBrokerFactory factory = new XBeanBrokerFactory(); - BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); - - // lets disable persistence as we are a test - answer.setPersistent(false); - useTopic = true; - return answer; - } - - protected String getBrokerConfigUri() { - return "org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml"; - } - - /** - * Simple but custom topic interceptor. - * To be used for testing nested interceptors in conjunction with - * virtual topic interceptor. - */ - public static class SimpleDestinationInterceptor implements DestinationInterceptor { - - private final Logger LOG = LoggerFactory.getLogger(SimpleDestinationInterceptor.class); - private BrokerService broker; - - public SimpleDestinationInterceptor() { - } - - /* (non-Javadoc) - * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) - */ - public void setBrokerService(BrokerService brokerService) { - LOG.info("setBrokerService()"); - this.broker = brokerService; - } - - /* (non-Javadoc) - * @see org.apache.activemq.broker.region.DestinationInterceptor#intercept(org.apache.activemq.broker.region.Destination) - */ - @Override - public Destination intercept(final Destination destination) { - LOG.info("intercept({})", destination.getName()); - - if (!destination.getActiveMQDestination().getPhysicalName().startsWith("ActiveMQ")) { - return new DestinationFilter(destination) { - @Override - public void send(ProducerBrokerExchange context, Message message) throws Exception { - // Send message to Destination - if (LOG.isDebugEnabled()) { - LOG.debug("SimpleDestinationInterceptor: Sending message to destination:" + this.getActiveMQDestination().getPhysicalName()); - } - // message.setDestination(destination.getActiveMQDestination()); - super.send(context, message); - } - }; - } - return destination; - } - - /* (non-Javadoc) - * @see org.apache.activemq.broker.region.DestinationInterceptor#remove(org.apache.activemq.broker.region.Destination) - */ - @Override - public void remove(Destination destination) { - LOG.info("remove({})", destination.getName()); - this.broker = null; - } - - /* (non-Javadoc) - * @see org.apache.activemq.broker.region.DestinationInterceptor#create(org.apache.activemq.broker.Broker, org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.ActiveMQDestination) - */ - @Override - public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { - LOG.info("create(" + broker.getBrokerName() + ", " + context.toString() + ", " + destination.getPhysicalName()); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java deleted file mode 100644 index e91ae4b..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import org.apache.activemq.spring.ConsumerBean; - -/** - * - */ -public class FilteredQueueTest extends CompositeQueueTest { - - @Override - protected String getBrokerConfigUri() { - return "org/apache/activemq/broker/virtual/filtered-queue.xml"; - } - - @Override - protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { - messageList1.assertMessagesArrived(total / 2); - messageList2.assertMessagesArrived(1); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java deleted file mode 100644 index 5ca00b7..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.virtual.MirroredQueue; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.usage.MemoryUsage; -import org.apache.activemq.usage.StoreUsage; -import org.apache.activemq.usage.SystemUsage; -import org.apache.activemq.usage.TempUsage; -import org.apache.activemq.util.IOHelper; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.util.Assert; - -/** - * This test will determine that the producer flow control does not kick in. - * The original MirroredQueue implementation was causing the queue to update - * the topic memory usage instead of the queue memory usage. - * The reason is that the message memory usage instance will not be updated - * unless it is null. This was the case when the message was initially sent - * to the topic but then it was non-null when it was being sent to the queue. - * When the region destination was set, the associated memory usage was not - * updated to the passed queue destination and thus the memory usage of the - * topic was being updated instead. - * - * @author Claudio Corsi - */ -public class MirroredQueueCorrectMemoryUsageTest extends EmbeddedBrokerTestSupport { - - private static final Logger logger = LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class); - - private static final long ONE_MB = 0x0100000; - private static final long TEN_MB = ONE_MB * 10; - private static final long TWENTY_MB = TEN_MB * 2; - - private static final String CREATED_STATIC_FOR_PERSISTENT = "created.static.for.persistent"; - - @Override - protected boolean isPersistent() { - return true; - } - - @Override - protected BrokerService createBroker() throws Exception { - // Create the broker service instance.... - BrokerService broker = super.createBroker(); - // Create and add the mirrored queue destination interceptor .... - DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[1]; - MirroredQueue mq = new MirroredQueue(); - mq.setCopyMessage(true); - mq.setPrefix(""); - mq.setPostfix(".qmirror"); - destinationInterceptors[0] = mq; - broker.setDestinationInterceptors(destinationInterceptors); - // Create the destination policy for the topics and queues - PolicyMap policyMap = new PolicyMap(); - List<PolicyEntry> entries = new LinkedList<>(); - // Create Topic policy entry - PolicyEntry policyEntry = new PolicyEntry(); - super.useTopic = true; - ActiveMQDestination destination = super.createDestination(">"); - Assert.isTrue(destination.isTopic(), "Created destination was not a topic"); - policyEntry.setDestination(destination); - policyEntry.setProducerFlowControl(true); - policyEntry.setMemoryLimit(ONE_MB); // x10 - entries.add(policyEntry); - // Create Queue policy entry - policyEntry = new PolicyEntry(); - super.useTopic = false; - destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT); - Assert.isTrue(destination.isQueue(), "Created destination was not a queue"); - policyEntry.setDestination(destination); - policyEntry.setProducerFlowControl(true); - policyEntry.setMemoryLimit(TEN_MB); - entries.add(policyEntry); - policyMap.setPolicyEntries(entries); - broker.setDestinationPolicy(policyMap); - // Set destinations - broker.setDestinations(new ActiveMQDestination[]{destination}); - // Set system usage - SystemUsage memoryManager = new SystemUsage(); - MemoryUsage memoryUsage = new MemoryUsage(); - memoryUsage.setLimit(TEN_MB); - memoryManager.setMemoryUsage(memoryUsage); - StoreUsage storeUsage = new StoreUsage(); - storeUsage.setLimit(TWENTY_MB); - memoryManager.setStoreUsage(storeUsage); - TempUsage tempDiskUsage = new TempUsage(); - tempDiskUsage.setLimit(TEN_MB); - memoryManager.setTempUsage(tempDiskUsage); - broker.setSystemUsage(memoryManager); - // Set the persistent adapter - KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); - persistenceAdapter.setJournalMaxFileLength((int) TEN_MB); - // Delete all current messages... - IOHelper.deleteFile(persistenceAdapter.getDirectory()); - broker.setPersistenceAdapter(persistenceAdapter); - return broker; - } - - @Override - @Before - protected void setUp() throws Exception { - super.setUp(); - } - - @Override - @After - protected void tearDown() throws Exception { - super.tearDown(); - } - - @Test(timeout = 40000) - public void testNoMemoryUsageIncreaseForTopic() throws Exception { - Connection connection = super.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - Destination destination = session.createQueue(CREATED_STATIC_FOR_PERSISTENT); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - char[] m = new char[1024]; - Arrays.fill(m, 'x'); - // create some messages that have 1k each - for (int i = 1; i < 12000; i++) { - producer.send(session.createTextMessage(new String(m))); - logger.debug("Sent message: " + i); - } - producer.close(); - session.close(); - connection.stop(); - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java deleted file mode 100644 index 127f04c..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TemporaryQueue; - -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.spring.ConsumerBean; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * - */ -public class MirroredQueueTest extends EmbeddedBrokerTestSupport { - - private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueueTest.class); - private Connection connection; - - public void testSendingToQueueIsMirrored() throws Exception { - if (connection == null) { - connection = createConnection(); - } - connection.start(); - - ConsumerBean messageList = new ConsumerBean(); - messageList.setVerbose(true); - - Destination consumeDestination = createConsumeDestination(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - LOG.info("Consuming from: " + consumeDestination); - - MessageConsumer c1 = session.createConsumer(consumeDestination); - c1.setMessageListener(messageList); - - // create topic producer - ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName()); - LOG.info("Sending to: " + sendDestination); - - MessageProducer producer = session.createProducer(sendDestination); - assertNotNull(producer); - - int total = 10; - for (int i = 0; i < total; i++) { - producer.send(session.createTextMessage("message: " + i)); - } - - ///Thread.sleep(1000000); - - messageList.assertMessagesArrived(total); - - LOG.info("Received: " + messageList); - } - - public void testTempMirroredQueuesClearDown() throws Exception { - if (connection == null) { - connection = createConnection(); - } - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - TemporaryQueue tempQueue = session.createTemporaryQueue(); - RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class); - assertTrue(rb.getDestinationMap().size() == 5); - tempQueue.delete(); - assertTrue(rb.getDestinationMap().size() == 4); - } - - protected Destination createConsumeDestination() { - return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName()); - } - - protected String getQueueName() { - return "My.Queue"; - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService answer = new BrokerService(); - answer.setUseMirroredQueues(true); - answer.setPersistent(isPersistent()); - answer.addConnector(bindAddress); - return answer; - } - - @Override - protected void tearDown() throws Exception { - if (connection != null) { - connection.close(); - } - super.tearDown(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java deleted file mode 100644 index 6acaad1..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import javax.jms.Destination; - -import org.apache.activemq.command.ActiveMQQueue; - -/** - * - * - */ -public class MirroredQueueUsingVirtualTopicQueueTest extends MirroredQueueTest { - - @Override - protected Destination createConsumeDestination() { - String queueName = "Consumer.A.VirtualTopic.Mirror." + getQueueName(); - return new ActiveMQQueue(queueName); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e52d54e9/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java deleted file mode 100644 index 85e14c7..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.virtual; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.virtual.CompositeTopic; -import org.apache.activemq.broker.region.virtual.VirtualDestination; -import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.ByteSequence; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VirtualDestPerfTest { - - private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class); - public int messageSize = 5 * 1024; - public int messageCount = 10000; - ActiveMQTopic target = new ActiveMQTopic("target"); - BrokerService brokerService; - ActiveMQConnectionFactory connectionFactory; - - @Test - @Ignore("comparison test - 'new' no wait on future with async send broker side is always on") - public void testAsyncSendBurstToFillCache() throws Exception { - startBroker(4, true, true); - connectionFactory.setUseAsyncSend(true); - - // a burst of messages to fill the cache - messageCount = 22000; - messageSize = 10 * 1024; - - LinkedHashMap<Integer, Long> results = new LinkedHashMap<>(); - - final ActiveMQQueue queue = new ActiveMQQueue("targetQ"); - for (Integer numThreads : new Integer[]{1, 2}) { - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - final AtomicLong numMessagesToSend = new AtomicLong(messageCount); - purge(); - long startTime = System.currentTimeMillis(); - for (int i = 0; i < numThreads; i++) { - executor.execute(new Runnable() { - @Override - public void run() { - try { - produceMessages(numMessagesToSend, queue); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - executor.shutdown(); - executor.awaitTermination(5, TimeUnit.MINUTES); - long endTime = System.currentTimeMillis(); - long seconds = (endTime - startTime) / 1000; - LOG.info("For numThreads {} duration {}", numThreads.intValue(), seconds); - results.put(numThreads, seconds); - LOG.info("Broker got {} messages", brokerService.getAdminView().getTotalEnqueueCount()); - } - - brokerService.stop(); - brokerService.waitUntilStopped(); - LOG.info("Results: {}", results); - } - - private void purge() throws Exception { - ObjectName[] queues = brokerService.getAdminView().getQueues(); - if (queues.length == 1) { - QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(queues[0], QueueViewMBean.class, false); - queueViewMBean.purge(); - } - } - - @Test - @Ignore("comparison test - takes too long and really needs a peek at the graph") - public void testPerf() throws Exception { - LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<>(); - LinkedHashMap<Integer, Long> resultsF = new LinkedHashMap<>(); - - for (int i = 2; i < 11; i++) { - for (Boolean concurrent : new Boolean[]{true, false}) { - startBroker(i, concurrent, false); - - long startTime = System.currentTimeMillis(); - produceMessages(new AtomicLong(messageCount), target); - long endTime = System.currentTimeMillis(); - long seconds = (endTime - startTime) / 1000; - LOG.info("For routes {} duration {}", i, seconds); - if (concurrent) { - resultsT.put(i, seconds); - } - else { - resultsF.put(i, seconds); - } - brokerService.stop(); - brokerService.waitUntilStopped(); - } - } - LOG.info("results T{} F{}", resultsT, resultsF); - LOG.info("http://www.chartgo.com/samples.do?chart=line&border=1&show3d=0&width=600&height=500&roundedge=1&transparency=1&legend=1&title=Send:10k::Concurrent-v-Serial&xtitle=routes&ytitle=Duration(seconds)&chrtbkgndcolor=white&threshold=0.0&lang=en" + "&xaxis1=" + toStr(resultsT.keySet()) + "&yaxis1=" + toStr(resultsT.values()) + "&group1=concurrent" + "&xaxis2=" + toStr(resultsF.keySet()) + "&yaxis2=" + toStr(resultsF.values()) + "&group2=serial" + "&from=linejsp"); - } - - private String toStr(Collection set) { - return set.toString().replace(",", "%0D%0A").replace("[", "").replace("]", "").replace(" ", ""); - } - - protected void produceMessages(AtomicLong messageCount, ActiveMQDestination destination) throws Exception { - final ByteSequence payLoad = new ByteSequence(new byte[messageSize]); - Connection connection = connectionFactory.createConnection(); - MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(destination); - messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); - ActiveMQBytesMessage message = new ActiveMQBytesMessage(); - message.setContent(payLoad); - while (messageCount.decrementAndGet() >= 0) { - messageProducer.send(message); - } - connection.close(); - } - - private void startBroker(int fanoutCount, - boolean concurrentSend, - boolean concurrentStoreAndDispatchQueues) throws Exception { - brokerService = new BrokerService(); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.setUseVirtualTopics(true); - brokerService.addConnector("tcp://0.0.0.0:0"); - brokerService.setAdvisorySupport(false); - PolicyMap destPolicyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setExpireMessagesPeriod(0); - defaultEntry.setOptimizedDispatch(true); - defaultEntry.setCursorMemoryHighWaterMark(110); - destPolicyMap.setDefaultEntry(defaultEntry); - brokerService.setDestinationPolicy(destPolicyMap); - - CompositeTopic route = new CompositeTopic(); - route.setName("target"); - route.setForwardOnly(true); - route.setConcurrentSend(concurrentSend); - Collection<ActiveMQQueue> routes = new ArrayList<>(); - for (int i = 0; i < fanoutCount; i++) { - routes.add(new ActiveMQQueue("route." + i)); - } - route.setForwardTo(routes); - VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); - interceptor.setVirtualDestinations(new VirtualDestination[]{route}); - brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); - brokerService.start(); - - connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); - connectionFactory.setWatchTopicAdvisories(false); - if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { - - //with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default - // will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce - // order issues - ((KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQueues); - } - } -}
