Repository: qpid-broker-j Updated Branches: refs/heads/master 40a7fdbe4 -> b1a7aacc4
QPID-6933: [System Tests] Refactor last value queue tests as JMS 1.1 system test Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b1a7aacc Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b1a7aacc Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b1a7aacc Branch: refs/heads/master Commit: b1a7aacc4c61ff07a9da6d6c2ea4db3ee41e016d Parents: b4e6fcf Author: Alex Rudyy <[email protected]> Authored: Thu Dec 28 21:43:51 2017 +0000 Committer: Alex Rudyy <[email protected]> Committed: Thu Dec 28 21:48:24 2017 +0000 ---------------------------------------------------------------------- .../extensions/queue/LastValueQueueTest.java | 598 +++++++++++++++++++ .../qpid/server/queue/LastValueQueueTest.java | 573 ------------------ test-profiles/CPPExcludes | 1 - 3 files changed, 598 insertions(+), 574 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b1a7aacc/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java new file mode 100644 index 0000000..1313417 --- /dev/null +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java @@ -0,0 +1,598 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systests.jms_1_1.extensions.queue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.naming.NamingException; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.queue.LastValueQueue; +import org.apache.qpid.systests.JmsTestBase; + +public class LastValueQueueTest extends JmsTestBase +{ + private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueTest.class); + + private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; + private static final String KEY_PROPERTY = "key"; + + private static final int MSG_COUNT = 400; + + @Test + public void testConflation() throws Exception + { + String queueName = getTestName(); + createConflationQueue(queueName, KEY_PROPERTY, false); + Queue queue = createQueue(queueName); + + sendMessages(queue, 0, MSG_COUNT); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + List<Message> messages = new ArrayList<>(); + Message received; + while ((received = consumer.receive(getReceiveTimeout())) != null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received", 10, messages.size()); + + for (int i = 0; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", + MSG_COUNT - 10 + i, + msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + } + finally + { + consumerConnection.close(); + } + } + + @Test + public void testConflationWithRelease() throws Exception + { + String queueName = getTestName(); + createConflationQueue(queueName, KEY_PROPERTY, false); + Queue queue = createQueue(queueName); + + sendMessages(queue, 0, MSG_COUNT / 2); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message received; + List<Message> messages = new ArrayList<>(); + while ((received = consumer.receive(getReceiveTimeout())) != null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received", 10, messages.size()); + + for (int i = 0; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", + MSG_COUNT / 2 - 10 + i, + msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + } + finally + { + consumerConnection.close(); + } + + sendMessages(queue, MSG_COUNT / 2, MSG_COUNT); + + consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message received; + List<Message> messages = new ArrayList<>(); + while ((received = consumer.receive(getReceiveTimeout())) != null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received", 10, messages.size()); + + for (int i = 0; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", + MSG_COUNT - 10 + i, + msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + } + finally + { + consumerConnection.close(); + } + } + + @Test + public void testConflationWithReleaseAfterNewPublish() throws Exception + { + String queueName = getTestName(); + createConflationQueue(queueName, KEY_PROPERTY, false); + Queue queue = createQueue(queueName); + + sendMessages(queue, 0, MSG_COUNT / 2); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message received; + List<Message> messages = new ArrayList<>(); + while ((received = consumer.receive(getReceiveTimeout())) != null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received", 10, messages.size()); + + for (int i = 0; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", + MSG_COUNT / 2 - 10 + i, + msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + consumer.close(); + + sendMessages(queue, MSG_COUNT / 2, MSG_COUNT); + + consumerSession.close(); + } + finally + { + consumerConnection.close(); + } + + consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message received; + List<Message> messages = new ArrayList<>(); + while ((received = consumer.receive(getReceiveTimeout())) != null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received", 10, messages.size()); + + for (int i = 0; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", + MSG_COUNT - 10 + i, + msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + } + finally + { + consumerConnection.close(); + } + } + + @Test + public void testConflatedQueueDepth() throws Exception + { + String queueName = getTestName(); + createConflationQueue(queueName, KEY_PROPERTY, false); + Queue queue = createQueue(queueName); + + sendMessages(queue, 0, MSG_COUNT); + + final long queueDepth = getTotalDepthOfQueuesMessages(); + + assertEquals(10, queueDepth); + } + + @Test + public void testConflationBrowser() throws Exception + { + String queueName = getTestName(); + createConflationQueue(queueName, KEY_PROPERTY, true); + Queue queue = createQueue(queueName); + + sendMessages(queue, 0, MSG_COUNT); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + Message received; + List<Message> messages = new ArrayList<>(); + while ((received = consumer.receive(getReceiveTimeout())) != null) + { + messages.add(received); + } + + assertEquals("Unexpected number of messages received", 10, messages.size()); + + for (int i = 0; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received", + MSG_COUNT - 10 + i, + msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + + messages.clear(); + + sendMessages(queue, MSG_COUNT, MSG_COUNT + 1); + + while ((received = consumer.receive(getReceiveTimeout())) != null) + { + messages.add(received); + } + assertEquals("Unexpected number of messages received", 1, messages.size()); + assertEquals("Unexpected message number received", + MSG_COUNT, + messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + finally + { + consumerConnection.close(); + } + } + + @Test + public void testConflation2Browsers() throws Exception + { + String queueName = getTestName(); + createConflationQueue(queueName, KEY_PROPERTY, true); + Queue queue = createQueue(queueName); + + sendMessages(queue, 0, MSG_COUNT); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + MessageConsumer consumer2 = consumerSession.createConsumer(queue); + + consumerConnection.start(); + List<Message> messages = new ArrayList<>(); + List<Message> messages2 = new ArrayList<>(); + Message received = consumer.receive(getReceiveTimeout()); + Message received2 = consumer2.receive(getReceiveTimeout()); + + while (received != null || received2 != null) + { + if (received != null) + { + messages.add(received); + } + if (received2 != null) + { + messages2.add(received2); + } + + received = consumer.receive(getReceiveTimeout()); + received2 = consumer2.receive(getReceiveTimeout()); + } + + assertEquals("Unexpected number of messages received on first browser", 10, messages.size()); + assertEquals("Unexpected number of messages received on second browser", 10, messages2.size()); + + for (int i = 0; i < 10; i++) + { + Message msg = messages.get(i); + assertEquals("Unexpected message number received on first browser", + MSG_COUNT - 10 + i, + msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + msg = messages2.get(i); + assertEquals("Unexpected message number received on second browser", + MSG_COUNT - 10 + i, + msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + } + } + finally + { + consumerConnection.close(); + } + } + + @Test + public void testParallelProductionAndConsumption() throws Exception + { + String queueName = getTestName(); + createConflationQueue(queueName, KEY_PROPERTY, true); + Queue queue = createQueue(queueName); + + // Start producing threads that send messages + BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1", queue); + messageProducer1.startSendingMessages(); + BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2", queue); + messageProducer2.startSendingMessages(); + + Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1, queue); + + messageProducer1.join(); + messageProducer2.join(); + + final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); + assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); + final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); + assertEquals(lastSentMessages1, lastSentMessages2); + + assertEquals("The last message sent for each key should match the last message received for that key", + lastSentMessages1, lastReceivedMessages); + + assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); + } + + private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer, final Queue queue) throws Exception + { + producer.waitUntilQuarterOfMessagesSentToEncourageConflation(); + + Map<String, Integer> messageSequenceNumbersByKey = new HashMap<>(); + Connection consumerConnection = getConnectionBuilder().setPrefetch(1).build(); + try + { + + Session _consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOGGER.info("Starting to receive"); + + MessageConsumer _consumer = _consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message message; + int numberOfShutdownsReceived = 0; + int numberOfMessagesReceived = 0; + while (numberOfShutdownsReceived < 2) + { + message = _consumer.receive(getReceiveTimeout()); + assertNotNull("null received after " + + numberOfMessagesReceived + + " messages and " + + numberOfShutdownsReceived + + " shutdowns", message); + + if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN)) + { + numberOfShutdownsReceived++; + } + else + { + numberOfMessagesReceived++; + putMessageInMap(message, messageSequenceNumbersByKey); + } + } + + LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total"); + } + finally + { + consumerConnection.close(); + } + return messageSequenceNumbersByKey; + } + + private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException + { + String keyValue = message.getStringProperty(KEY_PROPERTY); + Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY); + messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); + } + + private class BackgroundMessageProducer + { + static final String SHUTDOWN = "SHUTDOWN"; + + private final String _threadName; + private final Queue _queue; + + private volatile Exception _exception; + + private Thread _thread; + private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<>(); + private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT / 4); + + BackgroundMessageProducer(String threadName, Queue queue) + { + _threadName = threadName; + _queue = queue; + } + + void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException + { + final long latchTimeout = 60000; + boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS); + assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", + success); + LOGGER.info("Quarter of messages sent"); + } + + public Exception getException() + { + return _exception; + } + + Map<String, Integer> getMessageSequenceNumbersByKey() + { + return Collections.unmodifiableMap(_messageSequenceNumbersByKey); + } + + void startSendingMessages() + { + Runnable messageSender = () -> { + try + { + LOGGER.info("Starting to send in background thread"); + Connection producerConnection = getConnection(); + try + { + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer backgroundProducer = producerSession.createProducer(_queue); + for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) + { + Message message = nextMessage(messageNumber, producerSession, 2); + backgroundProducer.send(message); + + putMessageInMap(message, _messageSequenceNumbersByKey); + _quarterOfMessagesSentLatch.countDown(); + } + + Message shutdownMessage = producerSession.createMessage(); + shutdownMessage.setBooleanProperty(SHUTDOWN, true); + // make sure the shutdown messages have distinct keys because the Qpid Cpp Broker will + // otherwise consider them to have the same key. + shutdownMessage.setStringProperty(KEY_PROPERTY, _threadName); + + backgroundProducer.send(shutdownMessage); + } + finally + { + producerConnection.close(); + } + + LOGGER.info("Finished sending in background thread"); + } + catch (Exception e) + { + _exception = e; + } + }; + + _thread = new Thread(messageSender); + _thread.setName(_threadName); + _thread.start(); + } + + void join() throws InterruptedException + { + final int timeoutInMillis = 120000; + _thread.join(timeoutInMillis); + assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); + } + } + + private void createConflationQueue(final String queueName, + final String keyProperty, final boolean enforceBrowseOnly) throws Exception + { + final Map<String, Object> arguments = new HashMap<>(); + arguments.put(LastValueQueue.LVQ_KEY, keyProperty); + if (enforceBrowseOnly) + { + arguments.put("ensureNondestructiveConsumers", true); + } + createEntityUsingAmqpManagement(queueName, "org.apache.qpid.LastValueQueue", arguments); + } + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + return nextMessage(msg, producerSession, 10); + } + + private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException + { + Message send = producerSession.createTextMessage("Message: " + msg); + + final String keyValue = String.valueOf(msg % numberOfUniqueKeyValues); + send.setStringProperty(KEY_PROPERTY, keyValue); + send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg); + + return send; + } + + private void sendMessages(final Queue queue, final int fromIndex, final int toIndex) + throws JMSException, NamingException + { + Connection producerConnection = getConnection(); + try + { + Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = producerSession.createProducer(queue); + + for (int msg = fromIndex; msg < toIndex; msg++) + { + producer.send(nextMessage(msg, producerSession)); + producerSession.commit(); + } + + producer.close(); + producerSession.close(); + } + finally + { + producerConnection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b1a7aacc/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java deleted file mode 100644 index 6d22adf..0000000 --- a/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java +++ /dev/null @@ -1,573 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.QpidException; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -public class LastValueQueueTest extends QpidBrokerTestCase -{ - private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueTest.class); - - private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg"; - private static final String KEY_PROPERTY = "key"; - - private static final int MSG_COUNT = 400; - - private String _queueName; - private Queue _queue; - private Connection _producerConnection; - private MessageProducer _producer; - private Session _producerSession; - private Connection _consumerConnection; - private Session _consumerSession; - private MessageConsumer _consumer; - - @Override - public void setUp() throws Exception - { - super.setUp(); - - _queueName = getTestQueueName(); - _producerConnection = getConnection(); - _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED); - } - - public void testConflation() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - createConflationQueue(_producerSession, false); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - _producerSession.commit(); - } - - _producer.close(); - _producerSession.close(); - _producerConnection.close(); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - Message received; - - List<Message> messages = new ArrayList<>(); - while((received = _consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - } - - public void testConflationWithRelease() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - - createConflationQueue(_producerSession, false); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT/2; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - _producerSession.commit(); - } - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - Message received; - List<Message> messages = new ArrayList<>(); - while((received = _consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - _consumerSession.close(); - _consumerConnection.close(); - - - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - _producerSession.commit(); - } - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - messages = new ArrayList<>(); - while((received = _consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - } - - - public void testConflationWithReleaseAfterNewPublish() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - - createConflationQueue(_producerSession, false); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT/2; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - _producerSession.commit(); - } - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - Message received; - List<Message> messages = new ArrayList<>(); - while((received = _consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - _consumer.close(); - - for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - } - _producerSession.commit(); - - // this causes the "old" messages to be released - _consumerSession.close(); - _consumerConnection.close(); - - - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - messages = new ArrayList<>(); - while((received = _consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - } - - public void testConflatedQueueDepth() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - createConflationQueue(_producerSession, false); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - _producerSession.commit(); - } - _producerConnection.start(); - final long queueDepth = getQueueDepth(_producerConnection,_queue); - - assertEquals(10, queueDepth); - } - - public void testConflationBrowser() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - - createConflationQueue(_producerSession, true); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - _producerSession.commit(); - } - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - Message received; - List<Message> messages = new ArrayList<>(); - while((received = _consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received",10,messages.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - messages.clear(); - - _producer.send(nextMessage(MSG_COUNT, _producerSession)); - _producerSession.commit(); - - while((received = _consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - assertEquals("Unexpected number of messages received",1,messages.size()); - assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - - - _producer.close(); - _producerSession.close(); - _producerConnection.close(); - } - - public void testConflation2Browsers() throws Exception - { - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - createConflationQueue(_producerSession, true); - _producer = _producerSession.createProducer(_queue); - - for (int msg = 0; msg < MSG_COUNT; msg++) - { - _producer.send(nextMessage(msg, _producerSession)); - _producerSession.commit(); - } - - _consumer = _consumerSession.createConsumer(_queue); - MessageConsumer consumer2 = _consumerSession.createConsumer(_queue); - - _consumerConnection.start(); - List<Message> messages = new ArrayList<>(); - List<Message> messages2 = new ArrayList<>(); - Message received = _consumer.receive(getReceiveTimeout()); - Message received2 = consumer2.receive(getReceiveTimeout()); - - while(received!=null || received2!=null) - { - if(received != null) - { - messages.add(received); - } - if(received2 != null) - { - messages2.add(received2); - } - - - received = _consumer.receive(getReceiveTimeout()); - received2 = consumer2.receive(getReceiveTimeout()); - - } - - assertEquals("Unexpected number of messages received on first browser",10,messages.size()); - assertEquals("Unexpected number of messages received on second browser",10,messages2.size()); - - for(int i = 0 ; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - msg = messages2.get(i); - assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } - - - _producer.close(); - _producerSession.close(); - _producerConnection.close(); - } - - public void testParallelProductionAndConsumption() throws Exception - { - createConflationQueue(_producerSession, false); - - // Start producing threads that send messages - BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1"); - messageProducer1.startSendingMessages(); - BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2"); - messageProducer2.startSendingMessages(); - - Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1); - - messageProducer1.join(); - messageProducer2.join(); - - final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); - assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); - final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); - assertEquals(lastSentMessages1, lastSentMessages2); - - assertEquals("The last message sent for each key should match the last message received for that key", - lastSentMessages1, lastReceivedMessages); - - assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); - } - - private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception - { - producer.waitUntilQuarterOfMessagesSentToEncourageConflation(); - - _consumerConnection = getConnectionWithPrefetch(1); - - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - LOGGER.info("Starting to receive"); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - Map<String, Integer> messageSequenceNumbersByKey = new HashMap<>(); - - Message message; - int numberOfShutdownsReceived = 0; - int numberOfMessagesReceived = 0; - while(numberOfShutdownsReceived < 2) - { - message = _consumer.receive(getReceiveTimeout()); - assertNotNull("null received after " + numberOfMessagesReceived + " messages and " + numberOfShutdownsReceived + " shutdowns", message); - - if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN)) - { - numberOfShutdownsReceived++; - } - else - { - numberOfMessagesReceived++; - putMessageInMap(message, messageSequenceNumbersByKey); - } - } - - LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total"); - - return messageSequenceNumbersByKey; - } - - private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException - { - String keyValue = message.getStringProperty(KEY_PROPERTY); - Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY); - messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); - } - - private class BackgroundMessageProducer - { - static final String SHUTDOWN = "SHUTDOWN"; - - private final String _threadName; - - private volatile Exception _exception; - - private Thread _thread; - private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<>(); - private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4); - - public BackgroundMessageProducer(String threadName) - { - _threadName = threadName; - } - - public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException - { - final long latchTimeout = 60000; - boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS); - assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success); - LOGGER.info("Quarter of messages sent"); - } - - public Exception getException() - { - return _exception; - } - - public Map<String, Integer> getMessageSequenceNumbersByKey() - { - return Collections.unmodifiableMap(_messageSequenceNumbersByKey); - } - - public void startSendingMessages() - { - Runnable messageSender = new Runnable() - { - @Override - public void run() - { - try - { - LOGGER.info("Starting to send in background thread"); - Connection producerConnection = getConnection(); - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer backgroundProducer = producerSession.createProducer(_queue); - for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) - { - Message message = nextMessage(messageNumber, producerSession, 2); - backgroundProducer.send(message); - - putMessageInMap(message, _messageSequenceNumbersByKey); - _quarterOfMessagesSentLatch.countDown(); - } - - Message shutdownMessage = producerSession.createMessage(); - shutdownMessage.setBooleanProperty(SHUTDOWN, true); - // make sure the shutdown messages have distinct keys because the Qpid Cpp Broker will - // otherwise consider them to have the same key. - shutdownMessage.setStringProperty(KEY_PROPERTY, _threadName); - - backgroundProducer.send(shutdownMessage); - - LOGGER.info("Finished sending in background thread"); - } - catch (Exception e) - { - _exception = e; - throw new RuntimeException(e); - } - } - }; - - _thread = new Thread(messageSender); - _thread.setName(_threadName); - _thread.start(); - } - - public void join() throws InterruptedException - { - final int timeoutInMillis = 120000; - _thread.join(timeoutInMillis); - assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); - } - } - - private void createConflationQueue(Session session, final boolean enforceBrowseOnly) throws QpidException, JMSException - { - if(isBroker10()) - { - final Map<String, Object> arguments = new HashMap<>(); - arguments.put(LastValueQueue.LVQ_KEY, KEY_PROPERTY); - if(enforceBrowseOnly) - { - arguments.put("ensureNondestructiveConsumers", true); - } - createEntityUsingAmqpManagement(_queueName, session, "org.apache.qpid.LastValueQueue", arguments); - _queue = session.createQueue(_queueName); - } - else - { - String browserOnly = enforceBrowseOnly ? "mode: browse," : ""; - String addr = String.format("ADDR:%s; {create: always, %s" + - "node: {x-declare:{arguments : {'qpid.last_value_queue_key':'%s'}}}}", - _queueName, browserOnly, KEY_PROPERTY); - - _queue = session.createQueue(addr); - session.createConsumer(_queue).close(); - } - } - - private Message nextMessage(int msg, Session producerSession) throws JMSException - { - return nextMessage(msg, producerSession, 10); - } - - private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException - { - Message send = producerSession.createTextMessage("Message: " + msg); - - final String keyValue = String.valueOf(msg % numberOfUniqueKeyValues); - send.setStringProperty(KEY_PROPERTY, keyValue); - send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg); - - return send; - } -} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b1a7aacc/test-profiles/CPPExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes index 054e5c9..9a249f2 100755 --- a/test-profiles/CPPExcludes +++ b/test-profiles/CPPExcludes @@ -190,7 +190,6 @@ org.apache.qpid.server.queue.FlowToDiskTest#* # Tests require AMQP management org.apache.qpid.server.routing.AlternateBindingRoutingTest#* -org.apache.qpid.server.queue.LastValueQueueTest#testConflatedQueueDepth org.apache.qpid.server.queue.QueueDepthWithSelectorTest#test org.apache.qpid.test.unit.message.UTF8Test#* --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
