Author: lquack
Date: Mon May 2 15:58:01 2016
New Revision: 1741994
URL: http://svn.apache.org/viewvc?rev=1741994&view=rev
Log:
QPID-7249: [Java Tests] Properly handle failures in AMQQueueDeferredOrderingTest
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java?rev=1741994&r1=1741993&r2=1741994&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
Mon May 2 15:58:01 2016
@@ -20,10 +20,18 @@
*/
package org.apache.qpid.client;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -34,26 +42,24 @@ import org.apache.qpid.test.utils.QpidBr
public class AMQQueueDeferredOrderingTest extends QpidBrokerTestCase
{
+ private static final Logger _logger =
LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
private Connection con;
private Session session;
- private AMQQueue queue;
+ private Queue queue;
private MessageConsumer consumer;
private int _numMessages;
+ private ExecutorService _executor;
+ private volatile boolean _shutdownThreads;
- private static final Logger _logger =
LoggerFactory.getLogger(AMQQueueDeferredOrderingTest.class);
-
- private ASyncProducer producerThread;
-
- private class ASyncProducer extends Thread
+ private class ASyncProducer implements Callable<Exception>
{
-
private MessageProducer producer;
private final Logger _logger =
LoggerFactory.getLogger(ASyncProducer.class);
private Session session;
private int start;
private int end;
- public ASyncProducer(AMQQueue q, int start, int end) throws Exception
+ public ASyncProducer(Queue q, int start, int end) throws JMSException
{
this.session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
this._logger.info("Create Consumer of Q1");
@@ -62,12 +68,12 @@ public class AMQQueueDeferredOrderingTes
this.end = end;
}
- public void run()
+ public Exception call()
{
try
{
this._logger.info("Starting to send messages");
- for (int i = start; i < end && !interrupted(); i++)
+ for (int i = start; i < end && !_shutdownThreads; i++)
{
producer.send(session.createTextMessage(Integer.toString(i)));
((AMQSession<?, ?>)session).sync();
@@ -76,8 +82,9 @@ public class AMQQueueDeferredOrderingTes
}
catch (Exception e)
{
- throw new RuntimeException(e);
+ return e;
}
+ return null;
}
}
@@ -92,28 +99,28 @@ public class AMQQueueDeferredOrderingTes
_logger.info("Create Session");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
_logger.info("Create Q");
- queue = new AMQQueue("amq.direct", "Q", "Q",
- false, true);
+ queue = getTestQueue();
_logger.info("Create Consumer of Q");
consumer = session.createConsumer(queue);
_logger.info("Start Connection");
con.start();
+ _executor = Executors.newSingleThreadExecutor();
}
public void testMessagesSentByTwoThreadsAreDeliveredInOrder() throws
Exception
{
+ Future<Exception> f;
+ Exception publisherException;
// Setup initial messages
_logger.info("Creating first producer thread");
- producerThread = new ASyncProducer(queue, 0, _numMessages / 2);
- producerThread.start();
- // Wait for them to be done
- producerThread.join();
+ f = _executor.submit(new ASyncProducer(queue, 0, _numMessages / 2));
+ publisherException = f.get(1, TimeUnit.MINUTES);
+ assertNull("Publishing first batch failed: " + publisherException,
publisherException);
// Setup second set of messages to produce while we consume
_logger.info("Creating second producer thread");
- producerThread = new ASyncProducer(queue, _numMessages / 2,
_numMessages);
- producerThread.start();
+ f = _executor.submit(new ASyncProducer(queue, _numMessages / 2,
_numMessages));
// Start consuming and checking they're in order
_logger.info("Consuming messages");
@@ -125,16 +132,31 @@ public class AMQQueueDeferredOrderingTes
assertTrue("Message " + i + " should be a text message", msg
instanceof TextMessage);
assertEquals("Message content " + i + " does not match expected",
Integer.toString(i), ((TextMessage) msg).getText());
}
+ publisherException = f.get(10, TimeUnit.SECONDS);
+ assertNull("Publishing second batch failed: " + publisherException,
publisherException);
}
protected void tearDown() throws Exception
{
- _logger.info("Interrupting producer thread");
- producerThread.interrupt();
- _logger.info("Closing connection");
- con.close();
-
- super.tearDown();
+ try
+ {
+ _logger.info("Interrupting producer thread");
+ _shutdownThreads = true;
+ _executor.shutdown();
+ assertTrue("Executor service failed to shutdown",
_executor.awaitTermination(1, TimeUnit.MINUTES));
+ }
+ finally
+ {
+ try
+ {
+ _logger.info("Closing connection");
+ con.close();
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
}
public static junit.framework.Test suite()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]