Author: lquack
Date: Mon May  2 15:58:01 2016
New Revision: 1741994

URL: http://svn.apache.org/viewvc?rev=1741994&view=rev
Log:
QPID-7249: [Java Tests] Properly handle failures in AMQQueueDeferredOrderingTest

Modified:
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java?rev=1741994&r1=1741993&r2=1741994&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
 Mon May  2 15:58:01 2016
@@ -20,10 +20,18 @@
  */
 package org.apache.qpid.client;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
@@ -34,26 +42,24 @@ import org.apache.qpid.test.utils.QpidBr
 
 public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
 {
+    private static final Logger _logger = 
LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
     private Connection con;
     private Session session;
-    private AMQQueue queue;
+    private Queue queue;
     private MessageConsumer consumer;
     private int _numMessages;
+    private ExecutorService _executor;
+    private volatile boolean _shutdownThreads;
 
-    private static final Logger _logger = 
LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
-
-    private ASyncProducer producerThread;
-
-    private class ASyncProducer extends Thread
+    private class ASyncProducer implements Callable<Exception>
     {
-
         private MessageProducer producer;
         private final Logger _logger = 
LoggerFactory.getLogger(ASyncProducer.class);
         private Session session;
         private int start;
         private int end;
 
-        public ASyncProducer(AMQQueue q, int start, int end) throws Exception
+        public ASyncProducer(Queue q, int start, int end) throws JMSException
         {
             this.session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
             this._logger.info("Create Consumer of Q1");
@@ -62,12 +68,12 @@ public class AMQQueueDeferredOrderingTes
             this.end = end;
         }
 
-        public void run()
+        public Exception call()
         {
             try
             {
                 this._logger.info("Starting to send messages");
-                for (int i = start; i < end && !interrupted(); i++)
+                for (int i = start; i < end && !_shutdownThreads; i++)
                 {
                     
producer.send(session.createTextMessage(Integer.toString(i)));
                     ((AMQSession<?, ?>)session).sync();
@@ -76,8 +82,9 @@ public class AMQQueueDeferredOrderingTes
             }
             catch (Exception e)
             {
-                throw new RuntimeException(e);
+                return e;
             }
+            return null;
         }
     }
 
@@ -92,28 +99,28 @@ public class AMQQueueDeferredOrderingTes
         _logger.info("Create Session");
         session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
         _logger.info("Create Q");
-        queue = new AMQQueue("amq.direct", "Q", "Q",
-                false, true);
+        queue = getTestQueue();
         _logger.info("Create Consumer of Q");
         consumer = session.createConsumer(queue);
         _logger.info("Start Connection");
         con.start();
+        _executor = Executors.newSingleThreadExecutor();
     }
 
     public void testMessagesSentByTwoThreadsAreDeliveredInOrder() throws 
Exception
     {
+        Future<Exception> f;
+        Exception publisherException;
 
         // Setup initial messages
         _logger.info("Creating first producer thread");
-        producerThread = new ASyncProducer(queue, 0, _numMessages / 2);
-        producerThread.start();
-        // Wait for them to be done
-        producerThread.join();
+        f = _executor.submit(new ASyncProducer(queue, 0, _numMessages / 2));
+        publisherException = f.get(1, TimeUnit.MINUTES);
+        assertNull("Publishing first batch failed: " + publisherException, 
publisherException);
 
         // Setup second set of messages to produce while we consume
         _logger.info("Creating second producer thread");
-        producerThread = new ASyncProducer(queue, _numMessages / 2, 
_numMessages);
-        producerThread.start();
+        f = _executor.submit(new ASyncProducer(queue, _numMessages / 2, 
_numMessages));
 
         // Start consuming and checking they're in order
         _logger.info("Consuming messages");
@@ -125,16 +132,31 @@ public class AMQQueueDeferredOrderingTes
             assertTrue("Message " + i + " should be a text message", msg 
instanceof TextMessage);
             assertEquals("Message content " + i + " does not match expected", 
Integer.toString(i), ((TextMessage) msg).getText());
         }
+        publisherException = f.get(10, TimeUnit.SECONDS);
+        assertNull("Publishing second batch failed: " + publisherException, 
publisherException);
     }
 
     protected void tearDown() throws Exception
     {
-        _logger.info("Interrupting producer thread");
-        producerThread.interrupt();
-        _logger.info("Closing connection");
-        con.close();
-
-        super.tearDown();
+        try
+        {
+            _logger.info("Interrupting producer thread");
+            _shutdownThreads = true;
+            _executor.shutdown();
+            assertTrue("Executor service failed to shutdown", 
_executor.awaitTermination(1, TimeUnit.MINUTES));
+        }
+        finally
+        {
+            try
+            {
+                _logger.info("Closing connection");
+                con.close();
+            }
+            finally
+            {
+                super.tearDown();
+            }
+        }
     }
 
     public static junit.framework.Test suite()



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to