http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java new file mode 100644 index 0000000..9120937 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class QueuePurgeTest extends CombinationTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(QueuePurgeTest.class); + private static final int NUM_TO_SEND = 20000; + private final String MESSAGE_TEXT = new String(new byte[1024]); + BrokerService broker; + ConnectionFactory factory; + Connection connection; + Session session; + Queue queue; + MessageConsumer consumer; + + protected void setUp() throws Exception { + setMaxTestTime(10*60*1000); // 10 mins + setAutoFail(true); + super.setUp(); + broker = new BrokerService(); + + File testDataDir = new File("target/activemq-data/QueuePurgeTest"); + broker.setDataDirectoryFile(testDataDir); + broker.setUseJmx(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.getSystemUsage().getMemoryUsage().setLimit(1024l*1024*64); + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(new File(testDataDir, "kahadb")); + broker.setPersistenceAdapter(persistenceAdapter); + broker.addConnector("tcp://localhost:0"); + broker.start(); + factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString()); + connection = factory.createConnection(); + connection.start(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (consumer != null) { + consumer.close(); + } + session.close(); + connection.stop(); + connection.close(); + broker.stop(); + } + + public void testPurgeLargeQueue() throws Exception { + applyBrokerSpoolingPolicy(); + createProducerAndSendMessages(NUM_TO_SEND); + QueueViewMBean proxy = getProxyToQueueViewMBean(); + LOG.info("purging.."); + + org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.region.Queue.class); + final AtomicBoolean gotPurgeLogMessage = new AtomicBoolean(false); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getMessage() instanceof String) { + String message = (String) event.getMessage(); + if (message.contains("purged of " + NUM_TO_SEND +" messages")) { + LOG.info("Received a log message: {} ", event.getMessage()); + gotPurgeLogMessage.set(true); + } + } + } + }; + + Level level = log4jLogger.getLevel(); + log4jLogger.setLevel(Level.INFO); + log4jLogger.addAppender(appender); + try { + + proxy.purge(); + + } finally { + log4jLogger.setLevel(level); + log4jLogger.removeAppender(appender); + } + + assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0, + proxy.getQueueSize()); + assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled()); + assertTrue("got expected info purge log message", gotPurgeLogMessage.get()); + } + + public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception { + applyBrokerSpoolingPolicy(); + final int expiryPeriod = 500; + applyExpiryDuration(expiryPeriod); + createProducerAndSendMessages(NUM_TO_SEND); + QueueViewMBean proxy = getProxyToQueueViewMBean(); + LOG.info("waiting for expiry to kick in a bunch of times to verify it does not blow mem"); + Thread.sleep(5000); + assertEquals("Queue size is has not changed " + proxy.getQueueSize(), NUM_TO_SEND, + proxy.getQueueSize()); + } + + + private void applyExpiryDuration(int i) { + broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i); + } + + private void applyBrokerSpoolingPolicy() { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setProducerFlowControl(false); + PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy(); + defaultEntry.setPendingQueuePolicy(pendingQueuePolicy); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + } + + + public void testPurgeLargeQueueWithConsumer() throws Exception { + applyBrokerSpoolingPolicy(); + createProducerAndSendMessages(NUM_TO_SEND); + QueueViewMBean proxy = getProxyToQueueViewMBean(); + createConsumer(); + long start = System.currentTimeMillis(); + LOG.info("purging.."); + proxy.purge(); + LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms"); + assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0, + proxy.getQueueSize()); + assertEquals("usage goes to duck", 0, proxy.getMemoryPercentUsage()); + Message msg; + do { + msg = consumer.receive(1000); + if (msg != null) { + msg.acknowledge(); + } + } while (msg != null); + assertEquals("Queue size not valid", 0, proxy.getQueueSize()); + } + + private QueueViewMBean getProxyToQueueViewMBean() + throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = + new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + + queue.getQueueName()); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, + QueueViewMBean.class, true); + return proxy; + } + + private void createProducerAndSendMessages(int numToSend) throws Exception { + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + queue = session.createQueue("test1"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < numToSend; i++) { + TextMessage message = session.createTextMessage(MESSAGE_TEXT + i); + if (i != 0 && i % 10000 == 0) { + LOG.info("sent: " + i); + } + producer.send(message); + } + producer.close(); + } + + private void createConsumer() throws Exception { + consumer = session.createConsumer(queue); + // wait for buffer fill out + Thread.sleep(5 * 1000); + for (int i = 0; i < 500; ++i) { + Message message = consumer.receive(); + message.acknowledge(); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java new file mode 100644 index 0000000..0439fa8 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.io.File; + +import static org.junit.matchers.JUnitMatchers.containsString; +import static org.junit.Assert.*; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.broker.BrokerService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.junit.*; + +/** + * Confirm that the broker does not resend unacknowledged messages during a broker shutdown. + */ +public class QueueResendDuringShutdownTest { + private static final Logger LOG = LoggerFactory.getLogger(QueueResendDuringShutdownTest.class); + public static final int NUM_CONNECTION_TO_TEST = 8; + + private static boolean iterationFoundFailure = false; + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private Connection[] connections; + private Connection producerConnection; + private Queue queue; + + private Object messageReceiveSync = new Object(); + private int receiveCount; + + @Before + public void setUp () throws Exception { + this.receiveCount = 0; + + this.broker = new BrokerService(); + this.broker.setPersistent(false); + this.broker.start(); + this.broker.waitUntilStarted(); + + this.factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + this.queue = new ActiveMQQueue("TESTQUEUE"); + + connections = new Connection[NUM_CONNECTION_TO_TEST]; + int iter = 0; + while ( iter < NUM_CONNECTION_TO_TEST ) { + this.connections[iter] = factory.createConnection(); + iter++; + } + + this.producerConnection = factory.createConnection(); + this.producerConnection.start(); + } + + @After + public void cleanup () throws Exception { + for ( Connection oneConnection : connections ) { + if ( oneConnection != null ) { + closeConnection(oneConnection); + } + } + connections = null; + + if ( this.producerConnection != null ) { + closeConnection(this.producerConnection); + this.producerConnection = null; + } + + this.broker.stop(); + this.broker.waitUntilStopped(); + } + + @Test(timeout=3000) + public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter1 () throws Throwable { + runTestIteration(); + } + + @Test(timeout=3000) + public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter2 () throws Throwable { + runTestIteration(); + } + + @Test(timeout=3000) + public void testRedeliverAtBrokerShutdownAutoAckMsgListenerIter3 () throws Throwable { + runTestIteration(); + } + + /** + * Run one iteration of the test, skipping it if a failure was found on a prior iteration since a single failure is + * enough. Also keep track of the state of failure for the iteration. + */ + protected void runTestIteration () throws Throwable { + if ( iterationFoundFailure ) { + LOG.info("skipping test iteration; failure previously detected"); + return; + } try { + testRedeliverAtBrokerShutdownAutoAckMsgListener(); + } catch ( Throwable thrown ) { + iterationFoundFailure = true; + throw thrown; + } + } + + protected void testRedeliverAtBrokerShutdownAutoAckMsgListener () throws Exception { + // Start consumers on all of the connections + for ( Connection oneConnection : connections ) { + MessageConsumer consumer = startupConsumer(oneConnection, false, Session.AUTO_ACKNOWLEDGE); + configureMessageListener(consumer); + oneConnection.start(); + } + + // Send one message to the Queue and wait a short time for the dispatch to occur. + this.sendMessage(); + waitForMessage(1000); + + // Verify one consumer received it + assertEquals(1, this.receiveCount); + + // Shutdown the broker + this.broker.stop(); + this.broker.waitUntilStopped(); + delay(100, "give queue time flush"); + + // Verify still only one consumer received it + assertEquals(1, this.receiveCount); + } + + /** + * Start a consumer on the given connection using the session transaction and acknowledge settings given. + */ + protected MessageConsumer startupConsumer (Connection conn, boolean transInd, int ackMode) + throws JMSException { + Session sess; + MessageConsumer consumer; + + sess = conn.createSession(transInd, ackMode); + consumer = sess.createConsumer(queue); + + return consumer; + } + + /** + * Mark the receipt of a message from one of the consumers. + */ + protected void messageReceived () { + synchronized ( this ) { + this.receiveCount++; + synchronized ( this.messageReceiveSync ) { + this.messageReceiveSync.notifyAll(); + } + } + } + + /** + * Setup the MessageListener for the given consumer. The listener uses a long delay on receiving the message to + * simulate the reported case of problems at shutdown caused by a message listener's connection closing while it is + * still processing. + */ + protected void configureMessageListener (MessageConsumer consumer) throws JMSException { + final MessageConsumer fConsumer = consumer; + + consumer.setMessageListener(new MessageListener() { + public void onMessage (Message msg) { + LOG.debug("got a message on consumer {}", fConsumer); + messageReceived(); + + // Delay long enough for the consumer to get closed while this delay is active. + delay(3000, "pause so connection shutdown leads to unacked message redelivery"); + } + }); + } + + /** + * Send a test message now. + */ + protected void sendMessage () throws JMSException { + Session sess = this.producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = sess.createProducer(queue); + prod.send(sess.createTextMessage("X-TEST-MSG-X")); + prod.close(); + sess.close(); + } + + /** + * Close the given connection safely and log any exception caught. + */ + protected void closeConnection (Connection conn) { + try { + conn.close(); + } catch ( JMSException jmsExc ) { + LOG.info("failed to cleanup connection", jmsExc); + } + } + + /** + * Pause for the given length of time, in milliseconds, logging an interruption if one occurs. Don't try to + * recover from interrupt - the test case does not support interrupting and such an occurrence likely means the + * test is being aborted. + */ + protected void delay (long delayMs, String desc) { + try { + Thread.sleep(delayMs); + } catch ( InterruptedException intExc ) { + LOG.warn("sleep interrupted: " + desc, intExc); + } + } + + /** + * Wait up to the specified duration for a message to be received by any consumer. + */ + protected void waitForMessage (long delayMs) { + try { + synchronized ( this.messageReceiveSync ) { + if ( this.receiveCount == 0 ) { + this.messageReceiveSync.wait(delayMs); + } + } + } catch ( InterruptedException intExc ) { + LOG.warn("sleep interrupted: wait for message to arrive"); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java new file mode 100644 index 0000000..6964842 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.InvalidSelectorException; +import javax.management.ObjectName; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.state.ProducerState; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.thread.TaskRunnerFactory; +import junit.framework.TestCase; + +public class SubscriptionAddRemoveQueueTest extends TestCase { + + Queue queue; + + ConsumerInfo info = new ConsumerInfo(); + List<SimpleImmediateDispatchSubscription> subs = new ArrayList<SimpleImmediateDispatchSubscription>(); + ConnectionContext context = new ConnectionContext(); + ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange(); + ProducerInfo producerInfo = new ProducerInfo(); + ProducerState producerState = new ProducerState(producerInfo); + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + int numSubscriptions = 1000; + boolean working = true; + int senders = 20; + + + @Override + public void setUp() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.start(); + DestinationStatistics parentStats = new DestinationStatistics(); + parentStats.setEnabled(true); + + TaskRunnerFactory taskFactory = new TaskRunnerFactory(); + MessageStore store = null; + + info.setDestination(destination); + info.setPrefetchSize(100); + + producerBrokerExchange.setProducerState(producerState); + producerBrokerExchange.setConnectionContext(context); + + queue = new Queue(brokerService, destination, store, parentStats, taskFactory); + queue.initialize(); + } + + public void testNoDispatchToRemovedConsumers() throws Exception { + final AtomicInteger producerId = new AtomicInteger(); + Runnable sender = new Runnable() { + public void run() { + AtomicInteger id = new AtomicInteger(); + int producerIdAndIncrement = producerId.getAndIncrement(); + while (working) { + try { + Message msg = new ActiveMQMessage(); + msg.setDestination(destination); + msg.setMessageId(new MessageId(producerIdAndIncrement + ":0:" + id.getAndIncrement())); + queue.send(producerBrokerExchange, msg); + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception in sendMessage, ex:" + e); + } + } + } + }; + + Runnable subRemover = new Runnable() { + public void run() { + for (Subscription sub : subs) { + try { + queue.removeSubscription(context, sub, 0); + } catch (Exception e) { + e.printStackTrace(); + fail("unexpected exception in removeSubscription, ex:" + e); + } + } + } + }; + + for (int i=0;i<numSubscriptions; i++) { + SimpleImmediateDispatchSubscription sub = new SimpleImmediateDispatchSubscription(); + subs.add(sub); + queue.addSubscription(context, sub); + } + assertEquals("there are X subscriptions", numSubscriptions, queue.getDestinationStatistics().getConsumers().getCount()); + ExecutorService executor = Executors.newCachedThreadPool(); + for (int i=0; i<senders ; i++) { + executor.submit(sender); + } + + Thread.sleep(1000); + for (SimpleImmediateDispatchSubscription sub : subs) { + assertTrue("There are some locked messages in the subscription", hasSomeLocks(sub.dispatched)); + } + + Future<?> result = executor.submit(subRemover); + result.get(); + working = false; + assertEquals("there are no subscriptions", 0, queue.getDestinationStatistics().getConsumers().getCount()); + + for (SimpleImmediateDispatchSubscription sub : subs) { + assertTrue("There are no locked messages in any removed subscriptions", !hasSomeLocks(sub.dispatched)); + } + + } + + private boolean hasSomeLocks(List<MessageReference> dispatched) { + boolean hasLock = false; + for (MessageReference mr: dispatched) { + QueueMessageReference qmr = (QueueMessageReference) mr; + if (qmr.getLockOwner() != null) { + hasLock = true; + break; + } + } + return hasLock; + } + + public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner { + + List<MessageReference> dispatched = + Collections.synchronizedList(new ArrayList<MessageReference>()); + + public void acknowledge(ConnectionContext context, MessageAck ack) + throws Exception { + } + + public void add(MessageReference node) throws Exception { + // immediate dispatch + QueueMessageReference qmr = (QueueMessageReference)node; + qmr.lock(this); + dispatched.add(qmr); + } + + public ConnectionContext getContext() { + return null; + } + + @Override + public int getCursorMemoryHighWaterMark() { + return 0; + } + + @Override + public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { + } + + @Override + public boolean isSlowConsumer() { + return false; + } + + @Override + public void unmatched(MessageReference node) throws IOException { + } + + @Override + public long getTimeOfLastMessageAck() { + return 0; + } + + @Override + public long getConsumedCount() { + return 0; + } + + @Override + public void incrementConsumedCount() { + } + + @Override + public void resetConsumedCount() { + } + + public void add(ConnectionContext context, Destination destination) + throws Exception { + } + + public void destroy() { + } + + public void gc() { + } + + public ConsumerInfo getConsumerInfo() { + return info; + } + + public long getDequeueCounter() { + return 0; + } + + public long getDispatchedCounter() { + return 0; + } + + public int getDispatchedQueueSize() { + return 0; + } + + public long getEnqueueCounter() { + return 0; + } + + public int getInFlightSize() { + return 0; + } + + public int getInFlightUsage() { + return 0; + } + + public ObjectName getObjectName() { + return null; + } + + public int getPendingQueueSize() { + return 0; + } + + public int getPrefetchSize() { + return 0; + } + + public String getSelector() { + return null; + } + + public boolean isBrowser() { + return false; + } + + public boolean isFull() { + return false; + } + + public boolean isHighWaterMark() { + return false; + } + + public boolean isLowWaterMark() { + return false; + } + + public boolean isRecoveryRequired() { + return false; + } + + public boolean isSlave() { + return false; + } + + public boolean matches(MessageReference node, + MessageEvaluationContext context) throws IOException { + return true; + } + + public boolean matches(ActiveMQDestination destination) { + return false; + } + + public void processMessageDispatchNotification( + MessageDispatchNotification mdn) throws Exception { + } + + public Response pullMessage(ConnectionContext context, MessagePull pull) + throws Exception { + return null; + } + + @Override + public boolean isWildcard() { + return false; + } + + public List<MessageReference> remove(ConnectionContext context, + Destination destination) throws Exception { + return new ArrayList<MessageReference>(dispatched); + } + + public void setObjectName(ObjectName objectName) { + } + + public void setSelector(String selector) + throws InvalidSelectorException, UnsupportedOperationException { + } + + public void updateConsumerPrefetch(int newPrefetch) { + } + + public boolean addRecoveredMessage(ConnectionContext context, + MessageReference message) throws Exception { + return false; + } + + public ActiveMQDestination getActiveMQDestination() { + return null; + } + + public int getLockPriority() { + return 0; + } + + public boolean isLockExclusive() { + return false; + } + + public void addDestination(Destination destination) { + } + + public void removeDestination(Destination destination) { + } + + public int countBeforeFull() { + return 10; + } + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java new file mode 100644 index 0000000..5657e5c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.UniquePropertyMessageEvictionStrategy; + +import javax.jms.*; +import java.util.ArrayList; +import java.util.List; + +public class UniquePropertyMessageEvictionStrategyTest extends EmbeddedBrokerTestSupport { + + + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>(); + final PolicyEntry entry = new PolicyEntry(); + entry.setTopic(">"); + + entry.setAdvisoryForDiscardingMessages(true); + entry.setTopicPrefetch(1); + + ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy(); + pendingMessageLimitStrategy.setLimit(10); + entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy); + + + UniquePropertyMessageEvictionStrategy messageEvictionStrategy = new UniquePropertyMessageEvictionStrategy(); + messageEvictionStrategy.setPropertyName("sequenceI"); + entry.setMessageEvictionStrategy(messageEvictionStrategy); + + // let evicted messages disappear + entry.setDeadLetterStrategy(null); + policyEntries.add(entry); + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + broker.setDestinationPolicy(policyMap); + + return broker; + } + + public void testEviction() throws Exception { + Connection conn = connectionFactory.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic destination = session.createTopic("TEST"); + + MessageProducer producer = session.createProducer(destination); + MessageConsumer consumer = session.createConsumer(destination); + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + TextMessage msg = session.createTextMessage("message " + i + j); + msg.setIntProperty("sequenceI", i); + msg.setIntProperty("sequenceJ", j); + producer.send(msg); + Thread.sleep(100); + } + } + + + for (int i = 0; i < 11; i++) { + javax.jms.Message msg = consumer.receive(1000); + assertNotNull(msg); + int seqI = msg.getIntProperty("sequenceI"); + int seqJ = msg.getIntProperty("sequenceJ"); + if (i ==0 ) { + assertEquals(0, seqI); + assertEquals(0, seqJ); + } else { + assertEquals(9, seqJ); + assertEquals(i - 1, seqI); + } + //System.out.println(msg.getIntProperty("sequenceI") + " " + msg.getIntProperty("sequenceJ")); + } + + javax.jms.Message msg = consumer.receive(1000); + assertNull(msg); + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java new file mode 100644 index 0000000..12589ca --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.broker.BrokerService; + +/** + * + */ +public class CursorDurableTest extends CursorSupport { + + protected Destination getDestination(Session session) throws JMSException { + String topicName = getClass().getName(); + return session.createTopic(topicName); + } + + protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException { + Connection connection = fac.createConnection(); + connection.setClientID("testConsumer"); + connection.start(); + return connection; + } + + protected MessageConsumer getConsumer(Connection connection) throws Exception { + Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = (Topic)getDestination(consumerSession); + MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, "testConsumer"); + return consumer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java new file mode 100644 index 0000000..b0668d9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy; + +/** + * + */ +public class CursorQueueStoreTest extends CursorSupport { + + protected Destination getDestination(Session session) throws JMSException { + String queueName = "QUEUE" + getClass().getName(); + return session.createQueue(queueName); + } + + protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException { + Connection connection = fac.createConnection(); + connection.setClientID("testConsumer"); + connection.start(); + return connection; + } + + protected MessageConsumer getConsumer(Connection connection) throws Exception { + Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = getDestination(consumerSession); + MessageConsumer consumer = consumerSession.createConsumer(dest); + return consumer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + PolicyEntry policy = new PolicyEntry(); + policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy()); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + answer.setDestinationPolicy(pMap); + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } + + public static Test suite() { + return suite(CursorQueueStoreTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java new file mode 100644 index 0000000..3c014a6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * + */ +public abstract class CursorSupport extends CombinationTestSupport { + + public int MESSAGE_COUNT = 500; + public int PREFETCH_SIZE = 50; + private static final Logger LOG = LoggerFactory.getLogger(CursorSupport.class); + + protected BrokerService broker; + protected String bindAddress = "tcp://localhost:60706"; + + protected abstract Destination getDestination(Session session) throws JMSException; + + protected abstract MessageConsumer getConsumer(Connection connection) throws Exception; + + protected abstract void configureBroker(BrokerService answer) throws Exception; + + public void testSendFirstThenConsume() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + Connection consumerConnection = getConsumerConnection(factory); + MessageConsumer consumer = getConsumer(consumerConnection); + consumerConnection.close(); + Connection producerConnection = factory.createConnection(); + producerConnection.start(); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(getDestination(session)); + List<Message> senderList = new ArrayList<Message>(); + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message msg = session.createTextMessage("test" + i); + senderList.add(msg); + producer.send(msg); + } + producerConnection.close(); + // now consume the messages + consumerConnection = getConsumerConnection(factory); + // create durable subs + consumer = getConsumer(consumerConnection); + List<Message> consumerList = new ArrayList<Message>(); + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message msg = consumer.receive(1000*5); + assertNotNull("Message "+i+" was missing.", msg); + consumerList.add(msg); + } + assertEquals(senderList, consumerList); + consumerConnection.close(); + } + + + public void initCombosForTestSendWhilstConsume() { + addCombinationValues("MESSAGE_COUNT", new Object[] {Integer.valueOf(400), + Integer.valueOf(500)}); + addCombinationValues("PREFETCH_SIZE", new Object[] {Integer.valueOf(100), + Integer.valueOf(50)}); + } + + public void testSendWhilstConsume() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + Connection consumerConnection = getConsumerConnection(factory); + // create durable subs + MessageConsumer consumer = getConsumer(consumerConnection); + consumerConnection.close(); + Connection producerConnection = factory.createConnection(); + producerConnection.start(); + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(getDestination(session)); + List<TextMessage> senderList = new ArrayList<TextMessage>(); + for (int i = 0; i < MESSAGE_COUNT / 10; i++) { + TextMessage msg = session.createTextMessage("test" + i); + senderList.add(msg); + producer.send(msg); + } + // now consume the messages + consumerConnection = getConsumerConnection(factory); + // create durable subs + consumer = getConsumer(consumerConnection); + final List<Message> consumerList = new ArrayList<Message>(); + final CountDownLatch latch = new CountDownLatch(1); + consumer.setMessageListener(new MessageListener() { + + public void onMessage(Message msg) { + try { + // sleep to act as a slow consumer + // which will force a mix of direct and polled dispatching + // using the cursor on the broker + Thread.sleep(50); + } catch (Exception e) { + e.printStackTrace(); + } + consumerList.add(msg); + if (consumerList.size() == MESSAGE_COUNT) { + latch.countDown(); + } + } + }); + for (int i = MESSAGE_COUNT / 10; i < MESSAGE_COUNT; i++) { + TextMessage msg = session.createTextMessage("test" + i); + senderList.add(msg); + producer.send(msg); + } + latch.await(300000, TimeUnit.MILLISECONDS); + producerConnection.close(); + consumerConnection.close(); + assertEquals("Still dipatching - count down latch not sprung", latch.getCount(), 0); + //assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(), consumerList.size(), senderList.size()); + for (int i = 0; i < senderList.size(); i++) { + Message sent = senderList.get(i); + Message consumed = consumerList.get(i); + if (!sent.equals(consumed)) { + LOG.error("BAD MATCH AT POS " + i); + LOG.error(sent.toString()); + LOG.error(consumed.toString()); + /* + * log.error("\n\n\n\n\n"); for (int j = 0; j < + * consumerList.size(); j++) { log.error(consumerList.get(j)); } + */ + } + assertEquals("This should be the same at pos " + i + " in the list", sent.getJMSMessageID(), consumed.getJMSMessageID()); + } + } + + protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException { + Connection connection = fac.createConnection(); + connection.setClientID("testConsumer"); + connection.start(); + return connection; + } + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress); + Properties props = new Properties(); + props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE); + props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE); + props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE); + cf.setProperties(props); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java new file mode 100644 index 0000000..123263d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.QueueMessageReference; +import org.apache.activemq.store.PList; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.util.ByteSequence; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FilePendingMessageCursorTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTestSupport.class); + protected BrokerService brokerService; + protected FilePendingMessageCursor underTest; + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.getTempDataStore().stop(); + } + } + + private void createBrokerWithTempStoreLimit() throws Exception { + brokerService = new BrokerService(); + brokerService.setUseJmx(false); + SystemUsage usage = brokerService.getSystemUsage(); + usage.getTempUsage().setLimit(1025*1024*15); + + // put something in the temp store to on demand initialise it + PList dud = brokerService.getTempDataStore().getPList("dud"); + dud.addFirst("A", new ByteSequence("A".getBytes())); + } + + @Test + public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception { + createBrokerWithTempStoreLimit(); + SystemUsage usage = brokerService.getSystemUsage(); + assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull()); + + underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false); + underTest.setSystemUsage(usage); + + // ok to add + underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE); + + assertFalse("cursor is not full", underTest.isFull()); + } + + @Test + public void testResetClearsIterator() throws Exception { + createBrokerWithTempStoreLimit(); + + underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false); + // ok to add + underTest.addMessageLast(QueueMessageReference.NULL_MESSAGE); + + underTest.reset(); + underTest.release(); + + try { + underTest.hasNext(); + fail("expect npe on use of iterator after release"); + } catch (NullPointerException expected) {} + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java new file mode 100644 index 0000000..1401b35 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java @@ -0,0 +1,421 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.AutoFailTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.StoreUsage; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.TempUsage; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Modified CursorSupport Unit test to reproduce the negative queue issue. + * + * Keys to reproducing: + * 1) Consecutive queues with listener on first sending to second queue + * 2) Push each queue to the memory limit + * This seems to help reproduce the issue more consistently, but + * we have seen times in our production environment where the + * negative queue can occur without. Our memory limits are + * very high in production and it still happens in varying + * frequency. + * 3) Prefetch + * Lowering the prefetch down to 10 and below seems to help + * reduce occurrences. + * 4) # of consumers per queue + * The issue occurs less with fewer consumers + * + * Things that do not affect reproduction: + * 1) Spring - we use spring in our production applications, but this test case works + * with or without it. + * 2) transacted + * + */ +public class NegativeQueueTest extends AutoFailTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(NegativeQueueTest.class); + + public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS"); + + private static final String QUEUE_1_NAME = "conn.test.queue.1"; + private static final String QUEUE_2_NAME = "conn.test.queue.2"; + + private static final long QUEUE_MEMORY_LIMIT = 2097152; + private static final long MEMORY_USAGE = 400000000; + private static final long TEMP_USAGE = 200000000; + private static final long STORE_USAGE = 1000000000; + // ensure we exceed the cache 70% + private static final int MESSAGE_COUNT = 2100; + + protected static final boolean TRANSACTED = true; + protected static final boolean DEBUG = true; + protected static int NUM_CONSUMERS = 20; + protected static int PREFETCH_SIZE = 1000; + + protected BrokerService broker; + protected String bindAddress = "tcp://localhost:0"; + + public void testWithDefaultPrefetch() throws Exception{ + PREFETCH_SIZE = 1000; + NUM_CONSUMERS = 20; + blastAndConsume(); + } + + public void x_testWithDefaultPrefetchFiveConsumers() throws Exception{ + PREFETCH_SIZE = 1000; + NUM_CONSUMERS = 5; + blastAndConsume(); + } + + public void x_testWithDefaultPrefetchTwoConsumers() throws Exception{ + PREFETCH_SIZE = 1000; + NUM_CONSUMERS = 2; + blastAndConsume(); + } + + public void testWithDefaultPrefetchOneConsumer() throws Exception{ + PREFETCH_SIZE = 1000; + NUM_CONSUMERS = 1; + blastAndConsume(); + } + + public void testWithMediumPrefetch() throws Exception{ + PREFETCH_SIZE = 50; + NUM_CONSUMERS = 20; + blastAndConsume(); + } + + public void x_testWithSmallPrefetch() throws Exception{ + PREFETCH_SIZE = 10; + NUM_CONSUMERS = 20; + blastAndConsume(); + } + + public void testWithNoPrefetch() throws Exception{ + PREFETCH_SIZE = 1; + NUM_CONSUMERS = 20; + blastAndConsume(); + } + + public void blastAndConsume() throws Exception { + LOG.info(getName()); + ConnectionFactory factory = createConnectionFactory(); + + //get proxy queues for statistics lookups + Connection proxyConnection = factory.createConnection(); + proxyConnection.start(); + Session proxySession = proxyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final QueueViewMBean proxyQueue1 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_1_NAME)); + final QueueViewMBean proxyQueue2 = getProxyToQueueViewMBean(proxySession.createQueue(QUEUE_2_NAME)); + + // LOAD THE QUEUE + Connection producerConnection = factory.createConnection(); + producerConnection.start(); + Session session = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createQueue(QUEUE_1_NAME); + MessageProducer producer = session.createProducer(queue); + List<TextMessage> senderList = new ArrayList<TextMessage>(); + for (int i = 0; i < MESSAGE_COUNT; i++) { + TextMessage msg = session.createTextMessage(i + " " + formatter.format(new Date())); + senderList.add(msg); + producer.send(msg); + if(TRANSACTED) session.commit(); + if(DEBUG && i%100 == 0){ + int index = (i/100)+1; + System.out.print(index-((index/10)*10)); + } + } + + //get access to the Queue info + if(DEBUG){ + System.out.println(""); + System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize()); + System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage()); + System.out.println("Queue1 Memory Available = "+proxyQueue1.getMemoryLimit()); + } + + // FLUSH THE QUEUE + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + Connection[] consumerConnections1 = new Connection[NUM_CONSUMERS]; + List<Message> consumerList1 = new ArrayList<Message>(); + Connection[] consumerConnections2 = new Connection[NUM_CONSUMERS]; + Connection[] producerConnections2 = new Connection[NUM_CONSUMERS]; + List<Message> consumerList2 = new ArrayList<Message>(); + + for(int ix=0; ix<NUM_CONSUMERS; ix++){ + producerConnections2[ix] = factory.createConnection(); + producerConnections2[ix].start(); + consumerConnections1[ix] = getConsumerConnection(factory); + Session consumerSession = consumerConnections1[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_1_NAME)); + consumer.setMessageListener(new SessionAwareMessageListener(producerConnections2[ix], consumerSession, QUEUE_2_NAME, latch1, consumerList1)); + } + + latch1.await(200000, TimeUnit.MILLISECONDS); + if(DEBUG){ + System.out.println(""); + System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize()); + System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage()); + System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit()); + } + + for(int ix=0; ix<NUM_CONSUMERS; ix++){ + consumerConnections2[ix] = getConsumerConnection(factory); + Session consumerSession = consumerConnections2[ix].createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(session.createQueue(QUEUE_2_NAME)); + consumer.setMessageListener(new SessionAwareMessageListener(consumerSession, latch2, consumerList2)); + } + + boolean success = Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + boolean done = latch2.await(10, TimeUnit.SECONDS); + if(DEBUG){ + System.out.println(""); + System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize()); + System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage()); + System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize()); + System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage()); + System.out.println("Queue2 Memory Available = "+proxyQueue2.getMemoryLimit()); + } + return done; + } + }, 300 * 1000); + if (!success) { + dumpAllThreads("blocked waiting on 2"); + } + assertTrue("got all expected messages on 2", success); + + producerConnection.close(); + for(int ix=0; ix<NUM_CONSUMERS; ix++){ + consumerConnections1[ix].close(); + consumerConnections2[ix].close(); + producerConnections2[ix].close(); + } + + //let the consumer statistics on queue2 have time to update + Thread.sleep(500); + + if(DEBUG){ + System.out.println(""); + System.out.println("Queue1 Size = "+proxyQueue1.getQueueSize()); + System.out.println("Queue1 Memory % Used = "+proxyQueue1.getMemoryPercentUsage()); + System.out.println("Queue2 Size = "+proxyQueue2.getQueueSize()); + System.out.println("Queue2 Memory % Used = "+proxyQueue2.getMemoryPercentUsage()); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == proxyQueue1.getQueueSize(); + }}); + assertEquals("Queue1 has gone negative,",0, proxyQueue1.getQueueSize()); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == proxyQueue2.getQueueSize(); + }}); + assertEquals("Queue2 has gone negative,",0, proxyQueue2.getQueueSize()); + proxyConnection.close(); + + } + + private QueueViewMBean getProxyToQueueViewMBean(Queue queue) throws MalformedObjectNameException, JMSException { + final String prefix = "org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="; + + ObjectName queueViewMBeanName = new ObjectName(prefix + queue.getQueueName()); + QueueViewMBean proxy = (QueueViewMBean) + broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + + return proxy; + } + + protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException { + Connection connection = fac.createConnection(); + connection.start(); + return connection; + } + + @Override + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(bindAddress); + Properties props = new Properties(); + props.setProperty("prefetchPolicy.durableTopicPrefetch", "" + PREFETCH_SIZE); + props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch", "" + PREFETCH_SIZE); + props.setProperty("prefetchPolicy.queuePrefetch", "" + PREFETCH_SIZE); + cf.setProperties(props); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + answer.waitUntilStarted(); + bindAddress = answer.getTransportConnectors().get(0).getConnectUri().toString(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(QUEUE_MEMORY_LIMIT); + policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy()); + + // disable the cache to be sure setBatch is the problem + // will get lots of duplicates + // real problem is sync between cursor and store add - leads to out or order messages + // in the cursor so setBatch can break. + // policy.setUseCache(false); + + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + answer.setDestinationPolicy(pMap); + answer.setDeleteAllMessagesOnStartup(true); + answer.addConnector("tcp://localhost:0"); + + MemoryUsage memoryUsage = new MemoryUsage(); + memoryUsage.setLimit(MEMORY_USAGE); + memoryUsage.setPercentUsageMinDelta(20); + + TempUsage tempUsage = new TempUsage(); + tempUsage.setLimit(TEMP_USAGE); + + StoreUsage storeUsage = new StoreUsage(); + storeUsage.setLimit(STORE_USAGE); + + SystemUsage systemUsage = new SystemUsage(); + systemUsage.setMemoryUsage(memoryUsage); + systemUsage.setTempUsage(tempUsage); + systemUsage.setStoreUsage(storeUsage); + answer.setSystemUsage(systemUsage); + } + + /** + * Message listener that is given the Session for transacted consumers + */ + class SessionAwareMessageListener implements MessageListener{ + private final List<Message> consumerList; + private final CountDownLatch latch; + private final Session consumerSession; + private Session producerSession; + private MessageProducer producer; + + public SessionAwareMessageListener(Session consumerSession, CountDownLatch latch, List<Message> consumerList){ + this(null, consumerSession, null, latch, consumerList); + } + + public SessionAwareMessageListener(Connection producerConnection, Session consumerSession, String outQueueName, + CountDownLatch latch, List<Message> consumerList){ + this.consumerList = consumerList; + this.latch = latch; + this.consumerSession = consumerSession; + + if(producerConnection != null){ + try { + producerSession = producerConnection.createSession(TRANSACTED, Session.AUTO_ACKNOWLEDGE); + Destination queue = producerSession.createQueue(outQueueName); + producer = producerSession.createProducer(queue); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + + @Override + public void onMessage(Message msg) { + try { + if(producer == null){ + // sleep to act as a slow consumer + // which will force a mix of direct and polled dispatching + // using the cursor on the broker + Thread.sleep(50); + }else{ + producer.send(msg); + if(TRANSACTED) producerSession.commit(); + } + } catch (Exception e) { + e.printStackTrace(); + } + + synchronized(consumerList){ + consumerList.add(msg); + if(DEBUG && consumerList.size()%100 == 0) { + int index = consumerList.size()/100; + System.out.print(index-((index/10)*10)); + } + if (consumerList.size() == MESSAGE_COUNT) { + latch.countDown(); + } + } + if(TRANSACTED){ + try { + consumerSession.commit(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java new file mode 100644 index 0000000..79d7e6c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java @@ -0,0 +1,444 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; + +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.util.IdGenerator; +import org.junit.Test; + +public class OrderPendingListTest { + + @Test + public void testAddMessageFirst() throws Exception { + + OrderedPendingList list = new OrderedPendingList(); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + + Iterator<MessageReference> iter = list.iterator(); + int lastId = list.size(); + while (iter.hasNext()) { + assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId()); + } + } + + @Test + public void testAddMessageLast() throws Exception { + + OrderedPendingList list = new OrderedPendingList(); + + list.addMessageLast(new TestMessageReference(1)); + list.addMessageLast(new TestMessageReference(2)); + list.addMessageLast(new TestMessageReference(3)); + list.addMessageLast(new TestMessageReference(4)); + list.addMessageLast(new TestMessageReference(5)); + + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + + Iterator<MessageReference> iter = list.iterator(); + int lastId = 1; + while (iter.hasNext()) { + assertEquals(lastId++, iter.next().getMessageId().getProducerSequenceId()); + } + } + + @Test + public void testClear() throws Exception { + OrderedPendingList list = new OrderedPendingList(); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertFalse(list.isEmpty()); + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + + list.clear(); + + assertTrue(list.isEmpty()); + assertTrue(list.size() == 0); + assertEquals(0, list.getAsList().size()); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageLast(new TestMessageReference(2)); + list.addMessageLast(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageLast(new TestMessageReference(5)); + + assertFalse(list.isEmpty()); + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + } + + @Test + public void testIsEmpty() throws Exception { + OrderedPendingList list = new OrderedPendingList(); + assertTrue(list.isEmpty()); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertFalse(list.isEmpty()); + list.clear(); + assertTrue(list.isEmpty()); + } + + @Test + public void testSize() { + OrderedPendingList list = new OrderedPendingList(); + assertTrue(list.isEmpty()); + + assertTrue(list.size() == 0); + list.addMessageFirst(new TestMessageReference(1)); + assertTrue(list.size() == 1); + list.addMessageLast(new TestMessageReference(2)); + assertTrue(list.size() == 2); + list.addMessageFirst(new TestMessageReference(3)); + assertTrue(list.size() == 3); + list.addMessageLast(new TestMessageReference(4)); + assertTrue(list.size() == 4); + list.addMessageFirst(new TestMessageReference(5)); + assertTrue(list.size() == 5); + + assertFalse(list.isEmpty()); + list.clear(); + assertTrue(list.isEmpty()); + assertTrue(list.size() == 0); + } + + @Test + public void testRemove() throws Exception { + + OrderedPendingList list = new OrderedPendingList(); + + TestMessageReference toRemove = new TestMessageReference(6); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + + list.addMessageLast(toRemove); + list.remove(toRemove); + + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + + list.remove(toRemove); + + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + + Iterator<MessageReference> iter = list.iterator(); + int lastId = list.size(); + while (iter.hasNext()) { + assertEquals(lastId--, iter.next().getMessageId().getProducerSequenceId()); + } + + list.remove(null); + } + + @Test + public void testContains() throws Exception { + + OrderedPendingList list = new OrderedPendingList(); + + TestMessageReference toRemove = new TestMessageReference(6); + + assertFalse(list.contains(toRemove)); + assertFalse(list.contains(null)); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + + list.addMessageLast(toRemove); + assertTrue(list.size() == 6); + assertTrue(list.contains(toRemove)); + list.remove(toRemove); + assertFalse(list.contains(toRemove)); + + assertTrue(list.size() == 5); + assertEquals(5, list.getAsList().size()); + } + + @Test + public void testValues() throws Exception { + + OrderedPendingList list = new OrderedPendingList(); + + TestMessageReference toRemove = new TestMessageReference(6); + + assertFalse(list.contains(toRemove)); + + list.addMessageFirst(new TestMessageReference(1)); + list.addMessageFirst(new TestMessageReference(2)); + list.addMessageFirst(new TestMessageReference(3)); + list.addMessageFirst(new TestMessageReference(4)); + list.addMessageFirst(new TestMessageReference(5)); + + Collection<MessageReference> values = list.values(); + assertEquals(5, values.size()); + + for (MessageReference msg : values) { + assertTrue(values.contains(msg)); + } + + assertFalse(values.contains(toRemove)); + + list.addMessageLast(toRemove); + values = list.values(); + assertEquals(6, values.size()); + for (MessageReference msg : values) { + assertTrue(values.contains(msg)); + } + + assertTrue(values.contains(toRemove)); + } + + @Test + public void testAddAll() throws Exception { + OrderedPendingList list = new OrderedPendingList(); + TestPendingList source = new TestPendingList(); + + source.addMessageFirst(new TestMessageReference(1)); + source.addMessageFirst(new TestMessageReference(2)); + source.addMessageFirst(new TestMessageReference(3)); + source.addMessageFirst(new TestMessageReference(4)); + source.addMessageFirst(new TestMessageReference(5)); + + assertTrue(list.isEmpty()); + assertEquals(5, source.size()); + list.addAll(source); + assertEquals(5, list.size()); + + for (MessageReference message : source) { + assertTrue(list.contains(message)); + } + + list.addAll(null); + } + + static class TestPendingList implements PendingList { + + private final LinkedList<MessageReference> theList = new LinkedList<MessageReference>(); + + @Override + public boolean isEmpty() { + return theList.isEmpty(); + } + + @Override + public void clear() { + theList.clear(); + } + + @Override + public PendingNode addMessageFirst(MessageReference message) { + theList.addFirst(message); + return new PendingNode(null, message); + } + + @Override + public PendingNode addMessageLast(MessageReference message) { + theList.addLast(message); + return new PendingNode(null, message); + } + + @Override + public PendingNode remove(MessageReference message) { + if (theList.remove(message)) { + return new PendingNode(null, message); + } else { + return null; + } + } + + @Override + public int size() { + return theList.size(); + } + + @Override + public Iterator<MessageReference> iterator() { + return theList.iterator(); + } + + @Override + public boolean contains(MessageReference message) { + return theList.contains(message); + } + + @Override + public Collection<MessageReference> values() { + return theList; + } + + @Override + public void addAll(PendingList pendingList) { + for(MessageReference messageReference : pendingList) { + theList.add(messageReference); + } + } + + @Override + public MessageReference get(MessageId messageId) { + for(MessageReference messageReference : theList) { + if (messageReference.getMessageId().equals(messageId)) { + return messageReference; + } + } + return null; + } + } + + static class TestMessageReference implements MessageReference { + + private static final IdGenerator id = new IdGenerator(); + + private MessageId messageId; + private int referenceCount = 0; + + public TestMessageReference(int sequenceId) { + messageId = new MessageId(id.generateId() + ":1", sequenceId); + } + + @Override + public MessageId getMessageId() { + return messageId; + } + + @Override + public Message getMessageHardRef() { + return null; + } + + @Override + public Message getMessage() { + return null; + } + + @Override + public boolean isPersistent() { + return false; + } + + @Override + public Destination getRegionDestination() { + return null; + } + + @Override + public int getRedeliveryCounter() { + return 0; + } + + @Override + public void incrementRedeliveryCounter() { + } + + @Override + public int getReferenceCount() { + return this.referenceCount; + } + + @Override + public int incrementReferenceCount() { + return this.referenceCount++; + } + + @Override + public int decrementReferenceCount() { + return this.referenceCount--; + } + + @Override + public ConsumerId getTargetConsumerId() { + return null; + } + + @Override + public int getSize() { + return 1; + } + + @Override + public long getExpiration() { + return 0; + } + + @Override + public String getGroupID() { + return null; + } + + @Override + public int getGroupSequence() { + return 0; + } + + @Override + public boolean isExpired() { + return false; + } + + @Override + public boolean isDropped() { + return false; + } + + @Override + public boolean isAdvisory() { + return false; + } + } +}
