Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 81a3391d7 -> 52ee3bff0


QPID-6933: [System Tests] Simplify LastValueQueueTest


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/52ee3bff
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/52ee3bff
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/52ee3bff

Branch: refs/heads/master
Commit: 52ee3bff008094e1bb95f65a0922e6be942003b8
Parents: 81a3391
Author: Alex Rudyy <[email protected]>
Authored: Wed Jan 3 15:38:00 2018 +0000
Committer: Alex Rudyy <[email protected]>
Committed: Wed Jan 3 15:38:00 2018 +0000

----------------------------------------------------------------------
 .../extensions/queue/LastValueQueueTest.java    | 431 +++++++++----------
 1 file changed, 193 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/52ee3bff/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java
----------------------------------------------------------------------
diff --git 
a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java
 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java
index fc737c8..0c827eb 100644
--- 
a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java
+++ 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java
@@ -22,17 +22,17 @@
 package org.apache.qpid.systests.jms_1_1.extensions.queue;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
@@ -59,15 +59,41 @@ public class LastValueQueueTest extends JmsTestBase
     private static final String KEY_PROPERTY = "key";
 
     private static final int MSG_COUNT = 400;
+    private static final int NUMBER_OF_UNIQUE_KEY_VALUES = 10;
 
     @Test
     public void testConflation() throws Exception
     {
-        String queueName = getTestName();
-        createConflationQueue(queueName, KEY_PROPERTY, false);
-        Queue queue = createQueue(queueName);
+        final String queueName = getTestName();
+        final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, 
false);
+        final Connection producerConnection = getConnection();
+        try
+        {
+            Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = producerSession.createProducer(queue);
 
-        sendMessages(queue, 0, MSG_COUNT);
+            Message message = producerSession.createMessage();
+
+            message.setStringProperty(KEY_PROPERTY, "A");
+            message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 1);
+            producer.send(message);
+
+            message.setStringProperty(KEY_PROPERTY, "B");
+            message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 2);
+            producer.send(message);
+
+            message.setStringProperty(KEY_PROPERTY, "A");
+            message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 3);
+            producer.send(message);
+
+            message.setStringProperty(KEY_PROPERTY, "B");
+            message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 4);
+            producer.send(message);
+        }
+        finally
+        {
+            producerConnection.close();
+        }
 
         Connection consumerConnection = getConnection();
         try
@@ -76,22 +102,21 @@ public class LastValueQueueTest extends JmsTestBase
             MessageConsumer consumer = consumerSession.createConsumer(queue);
             consumerConnection.start();
 
-            List<Message> messages = new ArrayList<>();
-            Message received;
-            while ((received = consumer.receive(getReceiveTimeout())) != null)
-            {
-                messages.add(received);
-            }
-
-            assertEquals("Unexpected number of messages received", 10, 
messages.size());
-
-            for (int i = 0; i < 10; i++)
-            {
-                Message msg = messages.get(i);
-                assertEquals("Unexpected message number received",
-                             MSG_COUNT - 10 + i,
-                             
msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
-            }
+            Message received1 = consumer.receive(getReceiveTimeout());
+            assertNotNull("First message is not received", received1);
+            assertEquals("Unexpected key property value", "A", 
received1.getStringProperty(KEY_PROPERTY));
+            assertEquals("Unexpected sequence property value",
+                         3,
+                         
received1.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+
+            Message received2 = consumer.receive(getReceiveTimeout());
+            assertNotNull("Second message is not received", received2);
+            assertEquals("Unexpected key property value", "B", 
received2.getStringProperty(KEY_PROPERTY));
+            assertEquals("Unexpected sequence property value",
+                         4,
+                         
received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+
+            assertNull("Unexpected message is received", 
consumer.receive(getReceiveTimeout() / 4));
         }
         finally
         {
@@ -102,34 +127,25 @@ public class LastValueQueueTest extends JmsTestBase
     @Test
     public void testConflationWithRelease() throws Exception
     {
-        String queueName = getTestName();
-        createConflationQueue(queueName, KEY_PROPERTY, false);
-        Queue queue = createQueue(queueName);
+        final String queueName = getTestName();
+        final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, 
false);
 
         sendMessages(queue, 0, MSG_COUNT / 2);
 
         Connection consumerConnection = getConnection();
         try
         {
-            Session consumerSession = consumerConnection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
-            MessageConsumer consumer = consumerSession.createConsumer(queue);
+            final Session consumerSession = 
consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
consumerSession.createConsumer(queue);
             consumerConnection.start();
 
-            Message received;
-            List<Message> messages = new ArrayList<>();
-            while ((received = consumer.receive(getReceiveTimeout())) != null)
-            {
-                messages.add(received);
-            }
-
-            assertEquals("Unexpected number of messages received", 10, 
messages.size());
-
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
             {
-                Message msg = messages.get(i);
-                assertEquals("Unexpected message number received",
-                             MSG_COUNT / 2 - 10 + i,
-                             
msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+                final Message received = consumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message with key %d is not 
received", i), received);
+                assertEquals("Unexpected message received",
+                             MSG_COUNT / 2 - NUMBER_OF_UNIQUE_KEY_VALUES + i,
+                             
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
             }
         }
         finally
@@ -146,21 +162,13 @@ public class LastValueQueueTest extends JmsTestBase
             MessageConsumer consumer = consumerSession.createConsumer(queue);
             consumerConnection.start();
 
-            Message received;
-            List<Message> messages = new ArrayList<>();
-            while ((received = consumer.receive(getReceiveTimeout())) != null)
-            {
-                messages.add(received);
-            }
-
-            assertEquals("Unexpected number of messages received", 10, 
messages.size());
-
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
             {
-                Message msg = messages.get(i);
-                assertEquals("Unexpected message number received",
-                             MSG_COUNT - 10 + i,
-                             
msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+                final Message received = consumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message with key %d is not 
received", i), received);
+                assertEquals("Unexpected message received",
+                             MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
+                             
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
             }
         }
         finally
@@ -172,40 +180,30 @@ public class LastValueQueueTest extends JmsTestBase
     @Test
     public void testConflationWithReleaseAfterNewPublish() throws Exception
     {
-        String queueName = getTestName();
-        createConflationQueue(queueName, KEY_PROPERTY, false);
-        Queue queue = createQueue(queueName);
+        final String queueName = getTestName();
+        final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, 
false);
 
         sendMessages(queue, 0, MSG_COUNT / 2);
 
         Connection consumerConnection = getConnection();
         try
         {
-            Session consumerSession = consumerConnection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
-            MessageConsumer consumer = consumerSession.createConsumer(queue);
+            final Session consumerSession = 
consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
consumerSession.createConsumer(queue);
             consumerConnection.start();
 
-            Message received;
-            List<Message> messages = new ArrayList<>();
-            while ((received = consumer.receive(getReceiveTimeout())) != null)
-            {
-                messages.add(received);
-            }
-
-            assertEquals("Unexpected number of messages received", 10, 
messages.size());
-
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
             {
-                Message msg = messages.get(i);
-                assertEquals("Unexpected message number received",
-                             MSG_COUNT / 2 - 10 + i,
-                             
msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+                final Message received = consumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message with key %d is not 
received", i), received);
+                assertEquals("Unexpected message received",
+                             MSG_COUNT / 2 - NUMBER_OF_UNIQUE_KEY_VALUES + i,
+                             
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
             }
 
-            consumer.close();
-
             sendMessages(queue, MSG_COUNT / 2, MSG_COUNT);
 
+            consumer.close();
             consumerSession.close();
         }
         finally
@@ -216,25 +214,18 @@ public class LastValueQueueTest extends JmsTestBase
         consumerConnection = getConnection();
         try
         {
-            Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = consumerSession.createConsumer(queue);
             consumerConnection.start();
 
-            Message received;
-            List<Message> messages = new ArrayList<>();
-            while ((received = consumer.receive(getReceiveTimeout())) != null)
-            {
-                messages.add(received);
-            }
-
-            assertEquals("Unexpected number of messages received", 10, 
messages.size());
+            final Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
consumerSession.createConsumer(queue);
 
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
             {
-                Message msg = messages.get(i);
-                assertEquals("Unexpected message number received",
-                             MSG_COUNT - 10 + i,
-                             
msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+                final Message received = consumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message with key %d is not 
received", i), received);
+                assertEquals("Unexpected message received",
+                             MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
+                             
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
             }
         }
         finally
@@ -246,62 +237,45 @@ public class LastValueQueueTest extends JmsTestBase
     @Test
     public void testConflatedQueueDepth() throws Exception
     {
-        String queueName = getTestName();
-        createConflationQueue(queueName, KEY_PROPERTY, false);
-        Queue queue = createQueue(queueName);
+        final String queueName = getTestName();
+        final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, 
false);
 
         sendMessages(queue, 0, MSG_COUNT);
 
-        final long queueDepth = getTotalDepthOfQueuesMessages();
-
-        assertEquals(10, queueDepth);
+        assertEquals(NUMBER_OF_UNIQUE_KEY_VALUES, 
getTotalDepthOfQueuesMessages());
     }
 
     @Test
     public void testConflationBrowser() throws Exception
     {
-        String queueName = getTestName();
-        createConflationQueue(queueName, KEY_PROPERTY, true);
-        Queue queue = createQueue(queueName);
+        final String queueName = getTestName();
+        final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, 
true);
 
         sendMessages(queue, 0, MSG_COUNT);
 
-        Connection consumerConnection = getConnection();
+        final Connection consumerConnection = getConnection();
         try
         {
-            Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            final Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
-            MessageConsumer consumer = consumerSession.createConsumer(queue);
+            final MessageConsumer consumer = 
consumerSession.createConsumer(queue);
             consumerConnection.start();
-            Message received;
-            List<Message> messages = new ArrayList<>();
-            while ((received = consumer.receive(getReceiveTimeout())) != null)
-            {
-                messages.add(received);
-            }
-
-            assertEquals("Unexpected number of messages received", 10, 
messages.size());
-
-            for (int i = 0; i < 10; i++)
+            for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
             {
-                Message msg = messages.get(i);
-                assertEquals("Unexpected message number received",
-                             MSG_COUNT - 10 + i,
-                             
msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+                final Message received = consumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message with key %d is not 
received", i), received);
+                assertEquals("Unexpected message received",
+                             MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
+                             
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
             }
 
-            messages.clear();
-
             sendMessages(queue, MSG_COUNT, MSG_COUNT + 1);
 
-            while ((received = consumer.receive(getReceiveTimeout())) != null)
-            {
-                messages.add(received);
-            }
-            assertEquals("Unexpected number of messages received", 1, 
messages.size());
-            assertEquals("Unexpected message number received",
+            final Message received = consumer.receive(getReceiveTimeout());
+            assertNotNull(String.format("Message with key %d is not received", 
0), received);
+            assertEquals("Unexpected message received",
                          MSG_COUNT,
-                         
messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+                         
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
         }
         finally
         {
@@ -312,53 +286,33 @@ public class LastValueQueueTest extends JmsTestBase
     @Test
     public void testConflation2Browsers() throws Exception
     {
-        String queueName = getTestName();
-        createConflationQueue(queueName, KEY_PROPERTY, true);
-        Queue queue = createQueue(queueName);
+        final String queueName = getTestName();
+        final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, 
true);
 
         sendMessages(queue, 0, MSG_COUNT);
 
-        Connection consumerConnection = getConnection();
+        final Connection consumerConnection = getConnection();
         try
         {
-            Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = consumerSession.createConsumer(queue);
-            MessageConsumer consumer2 = consumerSession.createConsumer(queue);
+            final Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
consumerSession.createConsumer(queue);
+            final MessageConsumer consumer2 = 
consumerSession.createConsumer(queue);
 
             consumerConnection.start();
-            List<Message> messages = new ArrayList<>();
-            List<Message> messages2 = new ArrayList<>();
-            Message received = consumer.receive(getReceiveTimeout());
-            Message received2 = consumer2.receive(getReceiveTimeout());
 
-            while (received != null || received2 != null)
+            for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
             {
-                if (received != null)
-                {
-                    messages.add(received);
-                }
-                if (received2 != null)
-                {
-                    messages2.add(received2);
-                }
-
-                received = consumer.receive(getReceiveTimeout());
-                received2 = consumer2.receive(getReceiveTimeout());
-            }
-
-            assertEquals("Unexpected number of messages received on first 
browser", 10, messages.size());
-            assertEquals("Unexpected number of messages received on second 
browser", 10, messages2.size());
-
-            for (int i = 0; i < 10; i++)
-            {
-                Message msg = messages.get(i);
-                assertEquals("Unexpected message number received on first 
browser",
-                             MSG_COUNT - 10 + i,
-                             
msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
-                msg = messages2.get(i);
-                assertEquals("Unexpected message number received on second 
browser",
-                             MSG_COUNT - 10 + i,
-                             
msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+                final Message received = consumer.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message with key %d is not 
received by first consumer", i), received);
+                assertEquals("Unexpected message received by first consumer",
+                             MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
+                             
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+
+                final Message received2 = 
consumer2.receive(getReceiveTimeout());
+                assertNotNull(String.format("Message with key %d is not 
received by second consumer", i), received2);
+                assertEquals("Unexpected message received by second consumer",
+                             MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
+                             
received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
             }
         }
         finally
@@ -370,30 +324,42 @@ public class LastValueQueueTest extends JmsTestBase
     @Test
     public void testParallelProductionAndConsumption() throws Exception
     {
-        String queueName = getTestName();
-        createConflationQueue(queueName, KEY_PROPERTY, true);
-        Queue queue = createQueue(queueName);
+        final String queueName = getTestName();
+        final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, 
true);
 
-        // Start producing threads that send messages
-        BackgroundMessageProducer messageProducer1 = new 
BackgroundMessageProducer("Message sender1", queue);
-        messageProducer1.startSendingMessages();
-        BackgroundMessageProducer messageProducer2 = new 
BackgroundMessageProducer("Message sender2", queue);
-        messageProducer2.startSendingMessages();
+        int numberOfUniqueKeyValues = 2;
+        final ExecutorService executorService = 
Executors.newFixedThreadPool(2);
+        try
+        {
+            // Start producing threads that send messages
+            final BackgroundMessageProducer messageProducer1 = new 
BackgroundMessageProducer(queue,
+                                                                               
              numberOfUniqueKeyValues);
+            final BackgroundMessageProducer messageProducer2 = new 
BackgroundMessageProducer(queue,
+                                                                               
              numberOfUniqueKeyValues);
 
-        Map<String, Integer> lastReceivedMessages = 
receiveMessages(messageProducer1, queue);
+            final Future<?> future1 = executorService.submit(messageProducer1);
+            final Future<?> future2 = executorService.submit(messageProducer2);
 
-        messageProducer1.join();
-        messageProducer2.join();
+            final Map<String, Integer> lastReceivedMessages = 
receiveMessages(messageProducer1, queue);
 
-        final Map<String, Integer> lastSentMessages1 = 
messageProducer1.getMessageSequenceNumbersByKey();
-        assertEquals("Unexpected number of last sent messages sent by 
producer1", 2, lastSentMessages1.size());
-        final Map<String, Integer> lastSentMessages2 = 
messageProducer2.getMessageSequenceNumbersByKey();
-        assertEquals(lastSentMessages1, lastSentMessages2);
+            future1.get(getReceiveTimeout() * MSG_COUNT, 
TimeUnit.MILLISECONDS);
+            future2.get(getReceiveTimeout() * MSG_COUNT, 
TimeUnit.MILLISECONDS);
 
-        assertEquals("The last message sent for each key should match the last 
message received for that key",
-                     lastSentMessages1, lastReceivedMessages);
+            final Map<String, Integer> lastSentMessages1 = 
messageProducer1.getMessageSequenceNumbersByKey();
+            assertEquals("Unexpected number of last sent messages sent by 
producer1",
+                         numberOfUniqueKeyValues, lastSentMessages1.size());
+            final Map<String, Integer> lastSentMessages2 = 
messageProducer2.getMessageSequenceNumbersByKey();
+            assertEquals(lastSentMessages1, lastSentMessages2);
 
-        assertNull("Unexpected exception from background producer thread", 
messageProducer1.getException());
+            assertEquals("The last message sent for each key should match the 
last message received for that key",
+                         lastSentMessages1, lastReceivedMessages);
+
+            assertNull("Unexpected exception from background producer thread", 
messageProducer1.getException());
+        }
+        finally
+        {
+            executorService.shutdown();
+        }
     }
 
     private Map<String, Integer> receiveMessages(BackgroundMessageProducer 
producer, final Queue queue) throws Exception
@@ -405,11 +371,11 @@ public class LastValueQueueTest extends JmsTestBase
         try
         {
 
-            Session _consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
             LOGGER.info("Starting to receive");
 
-            MessageConsumer _consumer = _consumerSession.createConsumer(queue);
+            MessageConsumer consumer = consumerSession.createConsumer(queue);
             consumerConnection.start();
 
             Message message;
@@ -417,7 +383,7 @@ public class LastValueQueueTest extends JmsTestBase
             int numberOfMessagesReceived = 0;
             while (numberOfShutdownsReceived < 2)
             {
-                message = _consumer.receive(getReceiveTimeout());
+                message = consumer.receive(getReceiveTimeout());
                 assertNotNull("null received after "
                               + numberOfMessagesReceived
                               + " messages and "
@@ -451,23 +417,23 @@ public class LastValueQueueTest extends JmsTestBase
         messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber);
     }
 
-    private class BackgroundMessageProducer
+    private final class BackgroundMessageProducer implements Runnable
     {
         static final String SHUTDOWN = "SHUTDOWN";
 
-        private final String _threadName;
+
         private final Queue _queue;
 
         private volatile Exception _exception;
 
-        private Thread _thread;
         private Map<String, Integer> _messageSequenceNumbersByKey = new 
HashMap<>();
         private CountDownLatch _quarterOfMessagesSentLatch = new 
CountDownLatch(MSG_COUNT / 4);
+        private int _numberOfUniqueKeyValues;
 
-        BackgroundMessageProducer(String threadName, Queue queue)
+        BackgroundMessageProducer(Queue queue, final int 
numberOfUniqueKeyValues)
         {
-            _threadName = threadName;
             _queue = queue;
+            _numberOfUniqueKeyValues = numberOfUniqueKeyValues;
         }
 
         void waitUntilQuarterOfMessagesSentToEncourageConflation() throws 
InterruptedException
@@ -489,68 +455,56 @@ public class LastValueQueueTest extends JmsTestBase
             return Collections.unmodifiableMap(_messageSequenceNumbersByKey);
         }
 
-        void startSendingMessages()
+        @Override
+        public void run()
         {
-            Runnable messageSender = () -> {
+            try
+            {
+                LOGGER.info("Starting to send in background thread");
+                final Connection producerConnection = getConnection();
                 try
                 {
-                    LOGGER.info("Starting to send in background thread");
-                    Connection producerConnection = getConnection();
-                    try
-                    {
-                        Session producerSession = 
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-                        MessageProducer backgroundProducer = 
producerSession.createProducer(_queue);
-                        for (int messageNumber = 0; messageNumber < MSG_COUNT; 
messageNumber++)
-                        {
-                            Message message = nextMessage(messageNumber, 
producerSession, 2);
-                            backgroundProducer.send(message);
+                    final Session producerSession = 
producerConnection.createSession(true, Session.SESSION_TRANSACTED);
 
-                            putMessageInMap(message, 
_messageSequenceNumbersByKey);
-                            _quarterOfMessagesSentLatch.countDown();
-                        }
-
-                        Message shutdownMessage = 
producerSession.createMessage();
-                        shutdownMessage.setBooleanProperty(SHUTDOWN, true);
-                        // make sure the shutdown messages have distinct keys 
because the Qpid Cpp Broker will
-                        // otherwise consider them to have the same key.
-                        shutdownMessage.setStringProperty(KEY_PROPERTY, 
_threadName);
+                    final MessageProducer backgroundProducer = 
producerSession.createProducer(_queue);
+                    for (int messageNumber = 0; messageNumber < MSG_COUNT; 
messageNumber++)
+                    {
 
-                        backgroundProducer.send(shutdownMessage);
+                        final Message message = nextMessage(messageNumber, 
producerSession, _numberOfUniqueKeyValues);
+                        backgroundProducer.send(message);
+                        producerSession.commit();
 
-                        // make sure that all in-flight messages reach the 
Broker
-                        // before closing the connection
-                        producerSession.createTemporaryQueue().delete();
-                    }
-                    finally
-                    {
-                        producerConnection.close();
+                        putMessageInMap(message, _messageSequenceNumbersByKey);
+                        _quarterOfMessagesSentLatch.countDown();
                     }
 
-                    LOGGER.info("Finished sending in background thread");
+                    Message shutdownMessage = producerSession.createMessage();
+                    shutdownMessage.setBooleanProperty(SHUTDOWN, true);
+                    // make sure the shutdown messages have distinct keys 
because the Qpid Cpp Broker will
+                    // otherwise consider them to have the same key.
+                    shutdownMessage.setStringProperty(KEY_PROPERTY, 
Thread.currentThread().getName());
+
+                    backgroundProducer.send(shutdownMessage);
+                    producerSession.commit();
                 }
-                catch (Exception e)
+                finally
                 {
-                    _exception = e;
-                    LOGGER.warn("Unexpected exception in publisher", e);
+                    producerConnection.close();
                 }
-            };
 
-            _thread = new Thread(messageSender);
-            _thread.setName(_threadName);
-            _thread.start();
+                LOGGER.info("Finished sending in background thread");
+            }
+            catch (Exception e)
+            {
+                _exception = e;
+                LOGGER.warn("Unexpected exception in publisher", e);
+            }
         }
 
-        void join() throws InterruptedException
-        {
-            final int timeoutInMillis = 120000;
-            _thread.join(timeoutInMillis);
-            assertFalse("Expected producer thread to finish within " + 
timeoutInMillis + "ms", _thread.isAlive());
-        }
     }
 
-    private void createConflationQueue(final String queueName,
-                                       final String keyProperty, final boolean 
enforceBrowseOnly) throws Exception
+    private Queue createConflationQueue(final String queueName,
+                                        final String keyProperty, final 
boolean enforceBrowseOnly) throws Exception
     {
         final Map<String, Object> arguments = new HashMap<>();
         arguments.put(LastValueQueue.LVQ_KEY, keyProperty);
@@ -559,11 +513,12 @@ public class LastValueQueueTest extends JmsTestBase
             arguments.put("ensureNondestructiveConsumers", true);
         }
         createEntityUsingAmqpManagement(queueName, 
"org.apache.qpid.LastValueQueue", arguments);
+        return createQueue(queueName);
     }
 
     private Message nextMessage(int msg, Session producerSession) throws 
JMSException
     {
-        return nextMessage(msg, producerSession, 10);
+        return nextMessage(msg, producerSession, NUMBER_OF_UNIQUE_KEY_VALUES);
     }
 
     private Message nextMessage(int msg, Session producerSession, int 
numberOfUniqueKeyValues) throws JMSException


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to