Repository: qpid-broker-j Updated Branches: refs/heads/master 81a3391d7 -> 52ee3bff0
QPID-6933: [System Tests] Simplify LastValueQueueTest Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/52ee3bff Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/52ee3bff Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/52ee3bff Branch: refs/heads/master Commit: 52ee3bff008094e1bb95f65a0922e6be942003b8 Parents: 81a3391 Author: Alex Rudyy <[email protected]> Authored: Wed Jan 3 15:38:00 2018 +0000 Committer: Alex Rudyy <[email protected]> Committed: Wed Jan 3 15:38:00 2018 +0000 ---------------------------------------------------------------------- .../extensions/queue/LastValueQueueTest.java | 431 +++++++++---------- 1 file changed, 193 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/52ee3bff/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java index fc737c8..0c827eb 100644 --- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java @@ -22,17 +22,17 @@ package org.apache.qpid.systests.jms_1_1.extensions.queue; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import javax.jms.Connection; @@ -59,15 +59,41 @@ public class LastValueQueueTest extends JmsTestBase private static final String KEY_PROPERTY = "key"; private static final int MSG_COUNT = 400; + private static final int NUMBER_OF_UNIQUE_KEY_VALUES = 10; @Test public void testConflation() throws Exception { - String queueName = getTestName(); - createConflationQueue(queueName, KEY_PROPERTY, false); - Queue queue = createQueue(queueName); + final String queueName = getTestName(); + final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false); + final Connection producerConnection = getConnection(); + try + { + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); - sendMessages(queue, 0, MSG_COUNT); + Message message = producerSession.createMessage(); + + message.setStringProperty(KEY_PROPERTY, "A"); + message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 1); + producer.send(message); + + message.setStringProperty(KEY_PROPERTY, "B"); + message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 2); + producer.send(message); + + message.setStringProperty(KEY_PROPERTY, "A"); + message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 3); + producer.send(message); + + message.setStringProperty(KEY_PROPERTY, "B"); + message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 4); + producer.send(message); + } + finally + { + producerConnection.close(); + } Connection consumerConnection = getConnection(); try @@ -76,22 +102,21 @@ public class LastValueQueueTest extends JmsTestBase MessageConsumer consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - List<Message> messages = new ArrayList<>(); - Message received; - while ((received = consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received", 10, messages.size()); - - for (int i = 0; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", - MSG_COUNT - 10 + i, - msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - } + Message received1 = consumer.receive(getReceiveTimeout()); + assertNotNull("First message is not received", received1); + assertEquals("Unexpected key property value", "A", received1.getStringProperty(KEY_PROPERTY)); + assertEquals("Unexpected sequence property value", + 3, + received1.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + + Message received2 = consumer.receive(getReceiveTimeout()); + assertNotNull("Second message is not received", received2); + assertEquals("Unexpected key property value", "B", received2.getStringProperty(KEY_PROPERTY)); + assertEquals("Unexpected sequence property value", + 4, + received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + + assertNull("Unexpected message is received", consumer.receive(getReceiveTimeout() / 4)); } finally { @@ -102,34 +127,25 @@ public class LastValueQueueTest extends JmsTestBase @Test public void testConflationWithRelease() throws Exception { - String queueName = getTestName(); - createConflationQueue(queueName, KEY_PROPERTY, false); - Queue queue = createQueue(queueName); + final String queueName = getTestName(); + final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false); sendMessages(queue, 0, MSG_COUNT / 2); Connection consumerConnection = getConnection(); try { - Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); + final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - Message received; - List<Message> messages = new ArrayList<>(); - while ((received = consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received", 10, messages.size()); - - for (int i = 0; i < 10; i++) + for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++) { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", - MSG_COUNT / 2 - 10 + i, - msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + final Message received = consumer.receive(getReceiveTimeout()); + assertNotNull(String.format("Message with key %d is not received", i), received); + assertEquals("Unexpected message received", + MSG_COUNT / 2 - NUMBER_OF_UNIQUE_KEY_VALUES + i, + received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } finally @@ -146,21 +162,13 @@ public class LastValueQueueTest extends JmsTestBase MessageConsumer consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - Message received; - List<Message> messages = new ArrayList<>(); - while ((received = consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received", 10, messages.size()); - - for (int i = 0; i < 10; i++) + for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++) { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", - MSG_COUNT - 10 + i, - msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + final Message received = consumer.receive(getReceiveTimeout()); + assertNotNull(String.format("Message with key %d is not received", i), received); + assertEquals("Unexpected message received", + MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i, + received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } finally @@ -172,40 +180,30 @@ public class LastValueQueueTest extends JmsTestBase @Test public void testConflationWithReleaseAfterNewPublish() throws Exception { - String queueName = getTestName(); - createConflationQueue(queueName, KEY_PROPERTY, false); - Queue queue = createQueue(queueName); + final String queueName = getTestName(); + final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false); sendMessages(queue, 0, MSG_COUNT / 2); Connection consumerConnection = getConnection(); try { - Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); + final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - Message received; - List<Message> messages = new ArrayList<>(); - while ((received = consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received", 10, messages.size()); - - for (int i = 0; i < 10; i++) + for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++) { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", - MSG_COUNT / 2 - 10 + i, - msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + final Message received = consumer.receive(getReceiveTimeout()); + assertNotNull(String.format("Message with key %d is not received", i), received); + assertEquals("Unexpected message received", + MSG_COUNT / 2 - NUMBER_OF_UNIQUE_KEY_VALUES + i, + received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - consumer.close(); - sendMessages(queue, MSG_COUNT / 2, MSG_COUNT); + consumer.close(); consumerSession.close(); } finally @@ -216,25 +214,18 @@ public class LastValueQueueTest extends JmsTestBase consumerConnection = getConnection(); try { - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - Message received; - List<Message> messages = new ArrayList<>(); - while ((received = consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received", 10, messages.size()); + final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); - for (int i = 0; i < 10; i++) + for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++) { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", - MSG_COUNT - 10 + i, - msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + final Message received = consumer.receive(getReceiveTimeout()); + assertNotNull(String.format("Message with key %d is not received", i), received); + assertEquals("Unexpected message received", + MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i, + received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } finally @@ -246,62 +237,45 @@ public class LastValueQueueTest extends JmsTestBase @Test public void testConflatedQueueDepth() throws Exception { - String queueName = getTestName(); - createConflationQueue(queueName, KEY_PROPERTY, false); - Queue queue = createQueue(queueName); + final String queueName = getTestName(); + final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false); sendMessages(queue, 0, MSG_COUNT); - final long queueDepth = getTotalDepthOfQueuesMessages(); - - assertEquals(10, queueDepth); + assertEquals(NUMBER_OF_UNIQUE_KEY_VALUES, getTotalDepthOfQueuesMessages()); } @Test public void testConflationBrowser() throws Exception { - String queueName = getTestName(); - createConflationQueue(queueName, KEY_PROPERTY, true); - Queue queue = createQueue(queueName); + final String queueName = getTestName(); + final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, true); sendMessages(queue, 0, MSG_COUNT); - Connection consumerConnection = getConnection(); + final Connection consumerConnection = getConnection(); try { - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); + final MessageConsumer consumer = consumerSession.createConsumer(queue); consumerConnection.start(); - Message received; - List<Message> messages = new ArrayList<>(); - while ((received = consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - - assertEquals("Unexpected number of messages received", 10, messages.size()); - - for (int i = 0; i < 10; i++) + for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++) { - Message msg = messages.get(i); - assertEquals("Unexpected message number received", - MSG_COUNT - 10 + i, - msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + final Message received = consumer.receive(getReceiveTimeout()); + assertNotNull(String.format("Message with key %d is not received", i), received); + assertEquals("Unexpected message received", + MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i, + received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } - messages.clear(); - sendMessages(queue, MSG_COUNT, MSG_COUNT + 1); - while ((received = consumer.receive(getReceiveTimeout())) != null) - { - messages.add(received); - } - assertEquals("Unexpected number of messages received", 1, messages.size()); - assertEquals("Unexpected message number received", + final Message received = consumer.receive(getReceiveTimeout()); + assertNotNull(String.format("Message with key %d is not received", 0), received); + assertEquals("Unexpected message received", MSG_COUNT, - messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } finally { @@ -312,53 +286,33 @@ public class LastValueQueueTest extends JmsTestBase @Test public void testConflation2Browsers() throws Exception { - String queueName = getTestName(); - createConflationQueue(queueName, KEY_PROPERTY, true); - Queue queue = createQueue(queueName); + final String queueName = getTestName(); + final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, true); sendMessages(queue, 0, MSG_COUNT); - Connection consumerConnection = getConnection(); + final Connection consumerConnection = getConnection(); try { - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(queue); - MessageConsumer consumer2 = consumerSession.createConsumer(queue); + final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = consumerSession.createConsumer(queue); + final MessageConsumer consumer2 = consumerSession.createConsumer(queue); consumerConnection.start(); - List<Message> messages = new ArrayList<>(); - List<Message> messages2 = new ArrayList<>(); - Message received = consumer.receive(getReceiveTimeout()); - Message received2 = consumer2.receive(getReceiveTimeout()); - while (received != null || received2 != null) + for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++) { - if (received != null) - { - messages.add(received); - } - if (received2 != null) - { - messages2.add(received2); - } - - received = consumer.receive(getReceiveTimeout()); - received2 = consumer2.receive(getReceiveTimeout()); - } - - assertEquals("Unexpected number of messages received on first browser", 10, messages.size()); - assertEquals("Unexpected number of messages received on second browser", 10, messages2.size()); - - for (int i = 0; i < 10; i++) - { - Message msg = messages.get(i); - assertEquals("Unexpected message number received on first browser", - MSG_COUNT - 10 + i, - msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); - msg = messages2.get(i); - assertEquals("Unexpected message number received on second browser", - MSG_COUNT - 10 + i, - msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + final Message received = consumer.receive(getReceiveTimeout()); + assertNotNull(String.format("Message with key %d is not received by first consumer", i), received); + assertEquals("Unexpected message received by first consumer", + MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i, + received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); + + final Message received2 = consumer2.receive(getReceiveTimeout()); + assertNotNull(String.format("Message with key %d is not received by second consumer", i), received2); + assertEquals("Unexpected message received by second consumer", + MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i, + received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY)); } } finally @@ -370,30 +324,42 @@ public class LastValueQueueTest extends JmsTestBase @Test public void testParallelProductionAndConsumption() throws Exception { - String queueName = getTestName(); - createConflationQueue(queueName, KEY_PROPERTY, true); - Queue queue = createQueue(queueName); + final String queueName = getTestName(); + final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, true); - // Start producing threads that send messages - BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1", queue); - messageProducer1.startSendingMessages(); - BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2", queue); - messageProducer2.startSendingMessages(); + int numberOfUniqueKeyValues = 2; + final ExecutorService executorService = Executors.newFixedThreadPool(2); + try + { + // Start producing threads that send messages + final BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer(queue, + numberOfUniqueKeyValues); + final BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer(queue, + numberOfUniqueKeyValues); - Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1, queue); + final Future<?> future1 = executorService.submit(messageProducer1); + final Future<?> future2 = executorService.submit(messageProducer2); - messageProducer1.join(); - messageProducer2.join(); + final Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1, queue); - final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); - assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size()); - final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); - assertEquals(lastSentMessages1, lastSentMessages2); + future1.get(getReceiveTimeout() * MSG_COUNT, TimeUnit.MILLISECONDS); + future2.get(getReceiveTimeout() * MSG_COUNT, TimeUnit.MILLISECONDS); - assertEquals("The last message sent for each key should match the last message received for that key", - lastSentMessages1, lastReceivedMessages); + final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey(); + assertEquals("Unexpected number of last sent messages sent by producer1", + numberOfUniqueKeyValues, lastSentMessages1.size()); + final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey(); + assertEquals(lastSentMessages1, lastSentMessages2); - assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); + assertEquals("The last message sent for each key should match the last message received for that key", + lastSentMessages1, lastReceivedMessages); + + assertNull("Unexpected exception from background producer thread", messageProducer1.getException()); + } + finally + { + executorService.shutdown(); + } } private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer, final Queue queue) throws Exception @@ -405,11 +371,11 @@ public class LastValueQueueTest extends JmsTestBase try { - Session _consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); LOGGER.info("Starting to receive"); - MessageConsumer _consumer = _consumerSession.createConsumer(queue); + MessageConsumer consumer = consumerSession.createConsumer(queue); consumerConnection.start(); Message message; @@ -417,7 +383,7 @@ public class LastValueQueueTest extends JmsTestBase int numberOfMessagesReceived = 0; while (numberOfShutdownsReceived < 2) { - message = _consumer.receive(getReceiveTimeout()); + message = consumer.receive(getReceiveTimeout()); assertNotNull("null received after " + numberOfMessagesReceived + " messages and " @@ -451,23 +417,23 @@ public class LastValueQueueTest extends JmsTestBase messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber); } - private class BackgroundMessageProducer + private final class BackgroundMessageProducer implements Runnable { static final String SHUTDOWN = "SHUTDOWN"; - private final String _threadName; + private final Queue _queue; private volatile Exception _exception; - private Thread _thread; private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<>(); private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT / 4); + private int _numberOfUniqueKeyValues; - BackgroundMessageProducer(String threadName, Queue queue) + BackgroundMessageProducer(Queue queue, final int numberOfUniqueKeyValues) { - _threadName = threadName; _queue = queue; + _numberOfUniqueKeyValues = numberOfUniqueKeyValues; } void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException @@ -489,68 +455,56 @@ public class LastValueQueueTest extends JmsTestBase return Collections.unmodifiableMap(_messageSequenceNumbersByKey); } - void startSendingMessages() + @Override + public void run() { - Runnable messageSender = () -> { + try + { + LOGGER.info("Starting to send in background thread"); + final Connection producerConnection = getConnection(); try { - LOGGER.info("Starting to send in background thread"); - Connection producerConnection = getConnection(); - try - { - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer backgroundProducer = producerSession.createProducer(_queue); - for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) - { - Message message = nextMessage(messageNumber, producerSession, 2); - backgroundProducer.send(message); + final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); - putMessageInMap(message, _messageSequenceNumbersByKey); - _quarterOfMessagesSentLatch.countDown(); - } - - Message shutdownMessage = producerSession.createMessage(); - shutdownMessage.setBooleanProperty(SHUTDOWN, true); - // make sure the shutdown messages have distinct keys because the Qpid Cpp Broker will - // otherwise consider them to have the same key. - shutdownMessage.setStringProperty(KEY_PROPERTY, _threadName); + final MessageProducer backgroundProducer = producerSession.createProducer(_queue); + for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++) + { - backgroundProducer.send(shutdownMessage); + final Message message = nextMessage(messageNumber, producerSession, _numberOfUniqueKeyValues); + backgroundProducer.send(message); + producerSession.commit(); - // make sure that all in-flight messages reach the Broker - // before closing the connection - producerSession.createTemporaryQueue().delete(); - } - finally - { - producerConnection.close(); + putMessageInMap(message, _messageSequenceNumbersByKey); + _quarterOfMessagesSentLatch.countDown(); } - LOGGER.info("Finished sending in background thread"); + Message shutdownMessage = producerSession.createMessage(); + shutdownMessage.setBooleanProperty(SHUTDOWN, true); + // make sure the shutdown messages have distinct keys because the Qpid Cpp Broker will + // otherwise consider them to have the same key. + shutdownMessage.setStringProperty(KEY_PROPERTY, Thread.currentThread().getName()); + + backgroundProducer.send(shutdownMessage); + producerSession.commit(); } - catch (Exception e) + finally { - _exception = e; - LOGGER.warn("Unexpected exception in publisher", e); + producerConnection.close(); } - }; - _thread = new Thread(messageSender); - _thread.setName(_threadName); - _thread.start(); + LOGGER.info("Finished sending in background thread"); + } + catch (Exception e) + { + _exception = e; + LOGGER.warn("Unexpected exception in publisher", e); + } } - void join() throws InterruptedException - { - final int timeoutInMillis = 120000; - _thread.join(timeoutInMillis); - assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive()); - } } - private void createConflationQueue(final String queueName, - final String keyProperty, final boolean enforceBrowseOnly) throws Exception + private Queue createConflationQueue(final String queueName, + final String keyProperty, final boolean enforceBrowseOnly) throws Exception { final Map<String, Object> arguments = new HashMap<>(); arguments.put(LastValueQueue.LVQ_KEY, keyProperty); @@ -559,11 +513,12 @@ public class LastValueQueueTest extends JmsTestBase arguments.put("ensureNondestructiveConsumers", true); } createEntityUsingAmqpManagement(queueName, "org.apache.qpid.LastValueQueue", arguments); + return createQueue(queueName); } private Message nextMessage(int msg, Session producerSession) throws JMSException { - return nextMessage(msg, producerSession, 10); + return nextMessage(msg, producerSession, NUMBER_OF_UNIQUE_KEY_VALUES); } private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
