amarkevich closed pull request #491: cxf-rt-transports-jms: improve test 
stability
URL: https://github.com/apache/cxf/pull/491
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index 9340ef9a349..37acc66fee1 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -18,16 +18,16 @@
  */
 package org.apache.cxf.transport.jms;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Reader;
-import java.io.StringReader;
 import java.io.Writer;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.xml.namespace.QName;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -39,36 +39,35 @@
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.testutil.common.TestUtil;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl11.WSDLServiceFactory;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public abstract class AbstractJMSTester {
     protected static final String WSDL = "/jms_test.wsdl";
     protected static final String SERVICE_NS = 
"http://cxf.apache.org/hello_world_jms";;
     protected static final int MAX_RECEIVE_TIME = 10;
-    protected static final String MESSAGE_CONTENT = "HelloWorld";
     protected static Bus bus;
     protected static ActiveMQConnectionFactory cf1;
     protected static ConnectionFactory cf;
     protected static BrokerService broker;
+    private static final String MESSAGE_CONTENT = "HelloWorld";
 
     protected enum ExchangePattern { oneway, requestReply };
 
-    protected EndpointReferenceType target;
-    protected Message inMessage;
-    protected Message destMessage;
+    private final AtomicReference<Message> inMessage = new AtomicReference<>();
+    private final AtomicReference<Message> destMessage = new 
AtomicReference<>();
 
     @BeforeClass
     public static void startSerices() throws Exception {
@@ -92,12 +91,12 @@ public static void stopServices() throws Exception {
         broker.stop();
     }
 
-    protected EndpointInfo setupServiceInfo(String serviceName, String 
portName) {
+    protected static EndpointInfo setupServiceInfo(String serviceName, String 
portName) {
         return setupServiceInfo(SERVICE_NS, WSDL, serviceName, portName);
     }
 
-    protected EndpointInfo setupServiceInfo(String ns, String wsdl, String 
serviceName, String portName) {
-        URL wsdlUrl = getClass().getResource(wsdl);
+    protected static EndpointInfo setupServiceInfo(String ns, String wsdl, 
String serviceName, String portName) {
+        URL wsdlUrl = AbstractJMSTester.class.getResource(wsdl);
         if (wsdlUrl == null) {
             throw new IllegalArgumentException("Wsdl file not found on class 
path " + wsdl);
         }
@@ -113,64 +112,72 @@ protected EndpointInfo setupServiceInfo(String ns, String 
wsdl, String serviceNa
     protected MessageObserver createMessageObserver() {
         return new MessageObserver() {
             public void onMessage(Message m) {
-                Exchange exchange = new ExchangeImpl();
-                exchange.setInMessage(m);
-                m.setExchange(exchange);
-                destMessage = m;
+//                Exchange exchange = new ExchangeImpl();
+//                exchange.setInMessage(m);
+//                m.setExchange(exchange);
+                destMessage.set(m);
+                synchronized (destMessage) {
+                    destMessage.notifyAll();
+                }
             }
         };
     }
 
-    protected void sendMessageAsync(Conduit conduit, Message message) throws 
IOException {
+    protected static void sendMessageAsync(Conduit conduit, Message message) 
throws IOException {
         sendoutMessage(conduit, message, false, false);
     }
 
-    protected void sendMessageSync(Conduit conduit, Message message) throws 
IOException {
+    protected static void sendMessageSync(Conduit conduit, Message message) 
throws IOException {
         sendoutMessage(conduit, message, false, true);
     }
 
-    protected void sendMessage(Conduit conduit, Message message, boolean 
synchronous) throws IOException {
+    protected static void sendMessage(Conduit conduit, Message message, 
boolean synchronous) throws IOException {
         sendoutMessage(conduit, message, false, synchronous);
     }
 
-    protected void sendOneWayMessage(Conduit conduit, Message message) throws 
IOException {
+    protected static void sendOneWayMessage(Conduit conduit, Message message) 
throws IOException {
         sendoutMessage(conduit, message, true, true);
     }
 
-    private void sendoutMessage(Conduit conduit,
+    private static void sendoutMessage(Conduit conduit,
                                   Message message,
                                   boolean isOneWay,
                                   boolean synchronous) throws IOException {
-
-        Exchange exchange = new ExchangeImpl();
+        final Exchange exchange = new ExchangeImpl();
         exchange.setOneWay(isOneWay);
         exchange.setSynchronous(synchronous);
         message.setExchange(exchange);
         exchange.setOutMessage(message);
         conduit.prepare(message);
-        OutputStream os = message.getContent(OutputStream.class);
-        Writer writer = message.getContent(Writer.class);
-        assertTrue("The OutputStream and Writer should not both be null ", os 
!= null || writer != null);
-        if (os != null) {
-            os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
-            os.close();
-        } else {
-            writer.write(MESSAGE_CONTENT);
-            writer.close();
+        try (OutputStream os = message.getContent(OutputStream.class)) {
+            if (os != null) {
+                os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
+                return;
+            }
+        }
+        try (Writer writer = message.getContent(Writer.class)) {
+            if (writer != null) {
+                writer.write(MESSAGE_CONTENT);
+                return;
+            }
         }
+        fail("The OutputStream and Writer should not both be null");
     }
 
-    protected JMSConduit setupJMSConduit(EndpointInfo ei) throws IOException {
+    protected static JMSConduit setupJMSConduit(EndpointInfo ei) throws 
IOException {
         JMSConfiguration jmsConfig = 
JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
         jmsConfig.setConnectionFactory(cf);
-        return new JMSConduit(target, jmsConfig, bus);
+        return new JMSConduit(null, jmsConfig, bus);
     }
 
     protected JMSConduit setupJMSConduitWithObserver(EndpointInfo ei) throws 
IOException {
         JMSConduit jmsConduit = setupJMSConduit(ei);
         MessageObserver observer = new MessageObserver() {
             public void onMessage(Message m) {
-                inMessage = m;
+                inMessage.set(m);
+                synchronized (inMessage) {
+                    inMessage.notifyAll();
+                }
             }
         };
         jmsConduit.setMessageObserver(observer);
@@ -183,58 +190,85 @@ protected JMSDestination setupJMSDestination(EndpointInfo 
ei) throws IOException
         return new JMSDestination(bus, ei, jmsConfig);
     }
 
-    protected String getContent(Message message) {
-        ByteArrayInputStream bis = 
(ByteArrayInputStream)message.getContent(InputStream.class);
+    protected static Message createMessage() {
+        return createMessage(null);
+    }
+
+    protected static Message createMessage(String correlationId) {
+        Message outMessage = new MessageImpl();
+        JMSMessageHeadersType header = new JMSMessageHeadersType();
+        header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+        header.setJMSPriority(1);
+        header.setTimeToLive(1000L);
+        outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
+        outMessage.put(Message.ENCODING, "US-ASCII");
+        return outMessage;
+    }
+
+    protected static void verifyReceivedMessage(Message message) {
         String response = "<not found>";
+        InputStream bis = message.getContent(InputStream.class);
         if (bis != null) {
-            byte[] bytes = new byte[bis.available()];
             try {
+                byte[] bytes = new byte[bis.available()];
                 bis.read(bytes);
+                response = IOUtils.newStringFromBytes(bytes);
             } catch (IOException ex) {
-                assertFalse("Read the Destination recieved Message error ", 
false);
-                ex.printStackTrace();
+                fail("Read the Destination recieved Message error: " + 
ex.getMessage());
             }
-            response = IOUtils.newStringFromBytes(bytes);
         } else {
-            StringReader reader = 
(StringReader)message.getContent(Reader.class);
+            Reader reader = message.getContent(Reader.class);
             char[] buffer = new char[5000];
             try {
                 int i = reader.read(buffer);
                 response = new String(buffer, 0, i);
             } catch (IOException e) {
-                assertFalse("Read the Destination recieved Message error ", 
false);
-                e.printStackTrace();
+                fail("Read the Destination recieved Message error: " + 
e.getMessage());
             }
         }
-        return response;
+        assertEquals("The response content should be equal", MESSAGE_CONTENT, 
response);
     }
 
-    protected void waitForReceiveInMessage() {
-        int waitTime = 0;
-        while (inMessage == null && waitTime < MAX_RECEIVE_TIME * 10) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // do nothing here
+    protected static void verifyHeaders(Message msgIn, Message msgOut) {
+        JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
+            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+        JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
+            .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+
+        verifyJmsHeaderEquality(outHeader, inHeader);
+
+    }
+
+    protected static void verifyJmsHeaderEquality(JMSMessageHeadersType 
outHeader, JMSMessageHeadersType inHeader) {
+        assertEquals("The inMessage and outMessage JMS Header's JMSPriority 
should be equals", outHeader
+            .getJMSPriority(), inHeader.getJMSPriority());
+        assertEquals("The inMessage and outMessage JMS Header's 
JMSDeliveryMode should be equals", outHeader
+                     .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
+        assertEquals("The inMessage and outMessage JMS Header's JMSType should 
be equals", outHeader
+            .getJMSType(), inHeader.getJMSType());
+    }
+
+
+    protected Message waitForReceiveInMessage() throws InterruptedException {
+        if (null == inMessage.get()) {
+            synchronized (inMessage) {
+                inMessage.wait(MAX_RECEIVE_TIME * 1000L);
             }
-            waitTime++;
+            assertNotNull("Can't receive the Conduit Message in " + 
MAX_RECEIVE_TIME + " seconds", inMessage.get());
         }
-        assertTrue("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME 
+ " seconds",
-                   inMessage != null);
+        return inMessage.getAndSet(null);
     }
 
-    protected void waitForReceiveDestMessage() {
-        int waitTime = 0;
-        while (destMessage == null && waitTime < MAX_RECEIVE_TIME * 10) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // do nothing here
+    protected Message waitForReceiveDestMessage() throws InterruptedException {
+        if (null == destMessage.get()) {
+            synchronized (destMessage) {
+                destMessage.wait(MAX_RECEIVE_TIME * 1000L);
             }
-            waitTime++;
+            assertNotNull("Can't receive the Destination message in " + 
MAX_RECEIVE_TIME + " seconds",
+                    destMessage.get());
         }
-        assertNotNull("Can't receive the Destination message in " + 
MAX_RECEIVE_TIME
-                   + " seconds", destMessage);
+        return destMessage.getAndSet(null);
     }
 
 }
\ No newline at end of file
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
index 6a6ee452fe5..bf34bd35617 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
@@ -36,8 +36,7 @@
 
     @Test
     public void testGetConfiguration() throws Exception {
-        EndpointInfo ei = 
setupServiceInfo("http://cxf.apache.org/hello_world_jms";, WSDL,
-                         "HelloWorldQueueBinMsgService", 
"HelloWorldQueueBinMsgPort");
+        EndpointInfo ei = setupServiceInfo("HelloWorldQueueBinMsgService", 
"HelloWorldQueueBinMsgPort");
         JMSConduit conduit = setupJMSConduit(ei);
         assertEquals("Can't get the right ClientReceiveTimeout", 500L, 
conduit.getJmsConfig()
             .getReceiveTimeout().longValue());
@@ -46,8 +45,7 @@ public void testGetConfiguration() throws Exception {
 
     @Test
     public void testPrepareSend() throws Exception {
-        EndpointInfo ei = 
setupServiceInfo("http://cxf.apache.org/hello_world_jms";, WSDL,
-                         "HelloWorldService", "HelloWorldPort");
+        EndpointInfo ei = setupServiceInfo("HelloWorldService", 
"HelloWorldPort");
 
         JMSConduit conduit = setupJMSConduit(ei);
         Message message = new MessageImpl();
@@ -66,12 +64,11 @@ public void testPrepareSend() throws Exception {
      */
     @Test
     public void testTimeoutOnReceive() throws Exception {
-        EndpointInfo ei = 
setupServiceInfo("http://cxf.apache.org/hello_world_jms";, WSDL,
-                         "HelloWorldServiceLoop", "HelloWorldPortLoop");
+        EndpointInfo ei = setupServiceInfo("HelloWorldServiceLoop", 
"HelloWorldPortLoop");
 
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
         // If the system is extremely fast. The message could still get through
-        conduit.getJmsConfig().setReceiveTimeout(Long.valueOf(1));
+        conduit.getJmsConfig().setReceiveTimeout(1L);
         Message message = new MessageImpl();
         try {
             sendMessageSync(conduit, message);
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
index c15940fe5ef..f785a0f5718 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
@@ -39,7 +39,7 @@
     @Test
     public void testUsernameAndPassword() throws Exception {
         EndpointInfo ei = setupServiceInfo("HelloWorldService", 
"HelloWorldPort");
-        JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, 
ei, target);
+        JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, 
ei, null);
         Assert.assertEquals("User name does not match.", "testUser", 
config.getUserName());
         Assert.assertEquals("Password does not match.", "testPassword", 
config.getPassword());
     }
@@ -92,7 +92,7 @@ public void testConcurrentConsumers() {
     @Test
     public void testMessageSelectorIsSet() {
         EndpointInfo ei = setupServiceInfo("HelloWorldSelectorService", 
"HelloWorldPort");
-        JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, 
ei, target);
+        JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, 
ei, null);
         Assert.assertEquals("customJMSAttribute=helloWorld", 
config.getMessageSelector());
     }
 }
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index f216f6ba463..22a770ced4d 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -19,12 +19,6 @@
 
 package org.apache.cxf.transport.jms;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.StringReader;
-
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -33,7 +27,6 @@
 import javax.jms.Queue;
 import javax.jms.Topic;
 
-import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -49,7 +42,6 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -68,17 +60,15 @@ public void testGetConfigurationFromWSDL() throws Exception 
{
 
     @Test
     public void testDurableSubscriber() throws Exception {
-        destMessage = null;
         EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", 
"HelloWorldPubSubPort");
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        Message outMessage = createMessage();
         JMSDestination destination = setupJMSDestination(ei);
         destination.setMessageObserver(createMessageObserver());
         // The JMSBroker (ActiveMQ 5.x) need to take some time to setup the 
DurableSubscriber
-        Thread.sleep(500);
+        Thread.sleep(500L);
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        Message destMessage = waitForReceiveDestMessage();
 
         assertNotNull("The destiantion should have got the message ", 
destMessage);
         verifyReceivedMessage(destMessage);
@@ -86,7 +76,7 @@ public void testDurableSubscriber() throws Exception {
         conduit.close();
         destination.shutdown();
     }
-    
+
     @Test(expected = InvalidClientIDException.class)
     public void testDurableInvalidClientId() throws Throwable {
         Connection con = cf1.createConnection();
@@ -94,7 +84,6 @@ public void testDurableInvalidClientId() throws Throwable {
         try {
             con.setClientID("testClient");
             con.start();
-            destMessage = null;
             EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", 
"HelloWorldPubSubPort");
             JMSConfiguration jmsConfig = 
JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
             jmsConfig.setDurableSubscriptionClientId("testClient");
@@ -117,12 +106,11 @@ public void testOneWayDestination() throws Exception {
         destination.setMessageObserver(createMessageObserver());
 
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        Message outMessage = createMessage();
 
         sendOneWayMessage(conduit, outMessage);
         // wait for the message to be get from the destination
-        waitForReceiveDestMessage();
+        Message destMessage = waitForReceiveDestMessage();
         // just verify the Destination inMessage
         assertNotNull("The destiantion should have got the message ", 
destMessage);
         verifyReceivedMessage(destMessage);
@@ -131,51 +119,17 @@ public void testOneWayDestination() throws Exception {
         destination.shutdown();
     }
 
-    private void setupMessageHeader(Message outMessage, String correlationId, 
String replyTo) {
+    private static void setupMessageHeader(Message outMessage, String 
correlationId, String replyTo) {
         JMSMessageHeadersType header = new JMSMessageHeadersType();
         header.setJMSCorrelationID(correlationId);
         header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
         header.setJMSPriority(1);
-        header.setTimeToLive(1000);
-        header.setJMSReplyTo(replyTo != null ? replyTo : null);
+        header.setTimeToLive(1000L);
+        header.setJMSReplyTo(replyTo);
         outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
         outMessage.put(Message.ENCODING, "US-ASCII");
     }
 
-    private void setupMessageHeader(Message outMessage) {
-        setupMessageHeader(outMessage, "Destination test", null);
-    }
-
-    private void setupMessageHeader(Message outMessage, String correlationId) {
-        setupMessageHeader(outMessage, correlationId, null);
-    }
-
-    private void verifyReceivedMessage(Message message) {
-        ByteArrayInputStream bis = 
(ByteArrayInputStream)message.getContent(InputStream.class);
-        String response = "<not found>";
-        if (bis != null) {
-            byte[] bytes = new byte[bis.available()];
-            try {
-                bis.read(bytes);
-            } catch (IOException ex) {
-                assertFalse("Read the Destination recieved Message error ", 
false);
-                ex.printStackTrace();
-            }
-            response = IOUtils.newStringFromBytes(bytes);
-        } else {
-            StringReader reader = 
(StringReader)message.getContent(Reader.class);
-            char[] buffer = new char[5000];
-            try {
-                int i = reader.read(buffer);
-                response = new String(buffer, 0, i);
-            } catch (IOException e) {
-                assertFalse("Read the Destination recieved Message error ", 
false);
-                e.printStackTrace();
-            }
-        }
-        assertEquals("The response content should be equal", 
AbstractJMSTester.MESSAGE_CONTENT, response);
-    }
-
     private void verifyRequestResponseHeaders(Message msgIn, Message msgOut) {
         JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
             .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
@@ -188,27 +142,6 @@ private void verifyRequestResponseHeaders(Message msgIn, 
Message msgOut) {
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
 
         verifyJmsHeaderEquality(outHeader, inHeader);
-
-    }
-
-    private void verifyHeaders(Message msgIn, Message msgOut) {
-        JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
-            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
-        JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
-            .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-
-        verifyJmsHeaderEquality(outHeader, inHeader);
-
-    }
-
-    private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, 
JMSMessageHeadersType inHeader) {
-        assertEquals("The inMessage and outMessage JMS Header's JMSPriority 
should be equals", outHeader
-            .getJMSPriority(), inHeader.getJMSPriority());
-        assertEquals("The inMessage and outMessage JMS Header's 
JMSDeliveryMode should be equals", outHeader
-                     .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
-        assertEquals("The inMessage and outMessage JMS Header's JMSType should 
be equals", outHeader
-            .getJMSType(), inHeader.getJMSType());
     }
 
     @Test
@@ -233,8 +166,7 @@ private Message testRoundTripDestination(boolean 
createSecurityContext) throws E
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
         conduit.getJmsConfig().setCreateSecurityContext(createSecurityContext);
 
-        final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage, null);
+        final Message outMessage = createMessage();
         final JMSDestination destination = setupJMSDestination(ei);
 
 
@@ -263,19 +195,17 @@ public void onMessage(Message m) {
         // wait for the message to be got from the destination,
         // create the thread to handler the Destination incoming message
 
-        waitForReceiveInMessage();
-        verifyReceivedMessage(inMessage);
+        verifyReceivedMessage(waitForReceiveInMessage());
         // wait for a while for the jms session recycling
 
-        inMessage = null;
         // Send a second message to check for an issue
         // Where the session was closed the second time
         sendMessageSync(conduit, outMessage);
-        waitForReceiveInMessage();
+        Message inMessage = waitForReceiveInMessage();
         verifyReceivedMessage(inMessage);
 
         // wait for a while for the jms session recycling
-        Thread.sleep(1000);
+//        Thread.sleep(1000L);
         conduit.close();
         destination.shutdown();
 
@@ -289,8 +219,7 @@ public void testProperty() throws Exception {
 
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage, null);
+        final Message outMessage = createMessage();
 
         JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
             .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
@@ -325,7 +254,7 @@ public void onMessage(Message m) {
         // wait for the message to be got from the destination,
         // create the thread to handler the Destination incoming message
 
-        waitForReceiveInMessage();
+        Message inMessage = waitForReceiveInMessage();
         verifyReceivedMessage(inMessage);
 
         verifyRequestResponseHeaders(inMessage, outMessage);
@@ -336,7 +265,7 @@ public void onMessage(Message m) {
         // TODO we need to check the SOAP JMS transport properties here
 
         // wait for a while for the jms session recycling
-        Thread.sleep(1000);
+//        Thread.sleep(1000L);
         conduit.close();
         destination.shutdown();
     }
@@ -371,10 +300,9 @@ private SecurityContext testSecurityContext(boolean 
createSecurityContext) throw
         destination.setMessageObserver(createMessageObserver());
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage, null);
+        final Message outMessage = createMessage();
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        Message destMessage = waitForReceiveDestMessage();
         SecurityContext securityContext = 
destMessage.get(SecurityContext.class);
 
         conduit.close();
@@ -390,30 +318,26 @@ public void testOneWayReplyToSetUnset() throws Exception {
         /* 1. Test that replyTo destination set in WSDL is NOT used
          * in spec compliant mode */
 
-        destMessage = null;
         EndpointInfo ei = setupServiceInfo(
                          "HWStaticReplyQBinMsgService", 
"HWStaticReplyQBinMsgPort");
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
-        Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        Message outMessage = createMessage();
         JMSDestination destination = setupJMSDestination(ei);
         destination.setMessageObserver(createMessageObserver());
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        Message destMessage = waitForReceiveDestMessage();
         // just verify the Destination inMessage
         assertNotNull("The destination should have got the message ", 
destMessage);
         verifyReplyToNotSet(destMessage);
-        destMessage = null;
 
         /* 2. Test that replyTo destination set in WSDL IS used
          * in spec non-compliant mode */
 
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        destMessage = waitForReceiveDestMessage();
         assertNotNull("The destination should have got the message ", 
destMessage);
         String exName = 
getQueueName(conduit.getJmsConfig().getReplyDestination());
         verifyReplyToSet(destMessage, Queue.class, exName);
-        destMessage = null;
 
         /* 3. Test that replyTo destination provided via invocation context
          * overrides the value set in WSDL and IS used in spec non-compliant 
mode */
@@ -422,34 +346,31 @@ public void testOneWayReplyToSetUnset() throws Exception {
         exName += ".context";
         setupMessageHeader(outMessage, "cidValue", contextReplyTo);
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        destMessage = waitForReceiveDestMessage();
         assertNotNull("The destiantion should have got the message ", 
destMessage);
         verifyReplyToSet(destMessage, Queue.class, exName);
-        destMessage = null;
 
         /* 4. Test that replyTo destination provided via invocation context
          * and the value set in WSDL are NOT used in spec non-compliant mode
          * when JMSConstants.JMS_SET_REPLY_TO == false */
 
-        setupMessageHeader(outMessage);
+        setupMessageHeader(outMessage, null, null);
         outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.FALSE);
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        destMessage = waitForReceiveDestMessage();
         assertNotNull("The destiantion should have got the message ", 
destMessage);
         verifyReplyToNotSet(destMessage);
-        destMessage = null;
 
         /* 5. Test that replyTo destination set in WSDL IS used in spec 
non-compliant
          * mode when JMSConstants.JMS_SET_REPLY_TO == true */
 
-        setupMessageHeader(outMessage);
+        setupMessageHeader(outMessage, null, null);
         outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.TRUE);
         sendOneWayMessage(conduit, outMessage);
-        waitForReceiveDestMessage();
+        destMessage = waitForReceiveDestMessage();
         assertNotNull("The destiantion should have got the message ", 
destMessage);
         exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
         verifyReplyToSet(destMessage, Queue.class, exName);
-        destMessage = null;
 
         conduit.close();
         destination.shutdown();
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
index 9d3e37a5e49..3df49fb0a82 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
@@ -21,7 +21,6 @@
 import javax.jms.ConnectionFactory;
 
 import org.apache.activemq.pool.PooledConnectionFactory;
-import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
@@ -30,9 +29,11 @@
 import org.apache.cxf.transport.jms.util.TestReceiver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
-import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 /**
  * Checks if a CXF client works correlates requests and responses correctly if 
the server sets the message id
  * as correlation id on the response message
@@ -40,7 +41,8 @@
 public class MessageIdAsCorrelationIdJMSConduitTest {
     private static final String SERVICE_QUEUE = "test";
     private static final String BROKER_URI = 
"vm://localhost?broker.persistent=false";
-    private ConnectionFactory connectionFactory;
+
+    private ConnectionFactory connectionFactory = new 
PooledConnectionFactory(BROKER_URI);
 
     @Test
     public void testSendReceiveWithTempReplyQueue() throws Exception {
@@ -52,13 +54,9 @@ public void testSendReceive() throws Exception {
         sendAndReceive(true, "testreply");
     }
 
-    public void sendAndReceive(boolean synchronous, String replyDestination) 
throws Exception {
-        BusFactory bf = BusFactory.newInstance();
-        Bus bus = bf.createBus();
-        BusFactory.setDefaultBus(bus);
+    private void sendAndReceive(boolean synchronous, String replyDestination) 
throws InterruptedException {
         EndpointReferenceType target = new EndpointReferenceType();
 
-        connectionFactory = new PooledConnectionFactory(BROKER_URI);
         TestReceiver receiver = new TestReceiver(connectionFactory, 
SERVICE_QUEUE, true);
         receiver.runAsync();
 
@@ -68,7 +66,7 @@ public void sendAndReceive(boolean synchronous, String 
replyDestination) throws
         jmsConfig.setUseConduitIdSelector(false);
         jmsConfig.setReplyDestination(replyDestination);
 
-        JMSConduit conduit = new JMSConduit(target, jmsConfig, bus);
+        JMSConduit conduit = new JMSConduit(target, jmsConfig, 
BusFactory.getDefaultBus());
         Exchange exchange = new ExchangeImpl();
         exchange.setSynchronous(synchronous);
         Message message = new MessageImpl();
@@ -76,21 +74,16 @@ public void sendAndReceive(boolean synchronous, String 
replyDestination) throws
         conduit.sendExchange(exchange, "Request");
         waitForAsyncReply(exchange);
         receiver.close();
-        if (exchange.getInMessage() == null) {
-            throw new RuntimeException("No reply received within 2 seconds");
-        }
+        assertNotNull("No reply received within 2 seconds", 
exchange.getInMessage());
         JMSMessageHeadersType inHeaders = 
(JMSMessageHeadersType)exchange.getInMessage()
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-        Assert.assertEquals(receiver.getRequestMessageId(), 
inHeaders.getJMSCorrelationID());
+        assertEquals(receiver.getRequestMessageId(), 
inHeaders.getJMSCorrelationID());
         conduit.close();
-        bus.shutdown(true);
     }
 
-    private void waitForAsyncReply(Exchange exchange) throws 
InterruptedException {
-        int count = 0;
-        while (exchange.getInMessage() == null && count <= 20) {
-            Thread.sleep(100);
-            count++;
+    private static void waitForAsyncReply(Exchange exchange) throws 
InterruptedException {
+        for (int count = 0; exchange.getInMessage() == null && count <= 20; 
count++) {
+            Thread.sleep(100L);
         }
     }
 
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
index a60ef7a104f..8cccc261c39 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.cxf.transport.jms;
 
-import java.util.concurrent.Executors;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
@@ -32,16 +30,17 @@
 
 import org.apache.activemq.pool.PooledConnectionFactory;
 
-import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertNotNull;
+
 public class PooledConnectionTempQueueTest {
 
     protected static final String SERVICE_QUEUE = "queue1";
 
     @Test
     public void testTempQueueIssue() throws JMSException, InterruptedException 
{
-        final PooledConnectionFactory cf = new 
PooledConnectionFactory("vm://localhost?broker.persistent=false");
+        final ConnectionFactory cf = new 
PooledConnectionFactory("vm://localhost?broker.persistent=false");
 
         Connection con1 = cf.createConnection();
         con1.start();
@@ -49,20 +48,18 @@ public void testTempQueueIssue() throws JMSException, 
InterruptedException {
         // This order seems to matter to reproduce the issue
         con1.close();
 
-        Executors.newSingleThreadExecutor().execute(new Runnable() {
-            public void run() {
-                try {
-                    receiveAndRespondWithMessageIdAsCorrelationId(cf, 
SERVICE_QUEUE);
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                }
+        new Thread(() -> {
+            try {
+                receiveAndRespondWithMessageIdAsCorrelationId(cf, 
SERVICE_QUEUE);
+            } catch (Exception e) {
+                e.printStackTrace();
             }
-        });
+        }).start();
 
         sendWithReplyToTemp(cf, SERVICE_QUEUE);
     }
 
-    private void sendWithReplyToTemp(ConnectionFactory cf, String 
serviceQueue) throws JMSException,
+    private static void sendWithReplyToTemp(ConnectionFactory cf, String 
serviceQueue) throws JMSException,
         InterruptedException {
         Connection con = cf.createConnection();
         con.start();
@@ -74,11 +71,11 @@ private void sendWithReplyToTemp(ConnectionFactory cf, 
String serviceQueue) thro
         producer.send(msg);
 
         // This sleep also seems to matter
-        Thread.sleep(500);
+        Thread.sleep(500L);
 
         MessageConsumer consumer = session.createConsumer(tempQueue);
         Message replyMsg = consumer.receive();
-        Assert.assertNotNull(replyMsg);
+        assertNotNull(replyMsg);
         //System.out.println(replyMsg.getJMSCorrelationID());
 
         consumer.close();
@@ -88,7 +85,7 @@ private void sendWithReplyToTemp(ConnectionFactory cf, String 
serviceQueue) thro
         con.close();
     }
 
-    public void 
receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory 
connectionFactory,
+    public static void 
receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory 
connectionFactory,
                                                               String 
queueName) throws JMSException {
         Connection con = connectionFactory.createConnection();
         Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
index ab776448a33..053e78eb3c1 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
@@ -21,8 +21,6 @@
 
 import java.io.IOException;
 
-import javax.jms.DeliveryMode;
-
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -33,36 +31,8 @@
 
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
 public class RequestResponseTest extends AbstractJMSTester {
 
-    private void verifyReceivedMessage(Message message) {
-        String response = getContent(message);
-        assertEquals("The response content should be equal", 
AbstractJMSTester.MESSAGE_CONTENT, response);
-    }
-
-    private void verifyHeaders(Message msgIn, Message msgOut) {
-        JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
-            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
-        JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
-            .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-
-        verifyJmsHeaderEquality(outHeader, inHeader);
-
-    }
-
-    private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, 
JMSMessageHeadersType inHeader) {
-        assertEquals("The inMessage and outMessage JMS Header's JMSPriority 
should be equals", outHeader
-            .getJMSPriority(), inHeader.getJMSPriority());
-        assertEquals("The inMessage and outMessage JMS Header's 
JMSDeliveryMode should be equals", outHeader
-                     .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
-        assertEquals("The inMessage and outMessage JMS Header's JMSType should 
be equals", outHeader
-            .getJMSType(), inHeader.getJMSType());
-    }
-
-
     @Test
     public void testRequestQueueResponseTempQueue() throws Exception {
         EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_simple";, 
"/wsdl/jms_spec_testsuite.wsdl",
@@ -94,19 +64,8 @@ public void testRequestTopicResponseStaticQueue() throws 
Exception {
         sendAndReceiveMessages(ei, false);
     }
 
-    private Message createMessage() {
-        Message outMessage = new MessageImpl();
-        JMSMessageHeadersType header = new JMSMessageHeadersType();
-        header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
-        header.setJMSPriority(1);
-        header.setTimeToLive(1000);
-        outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
-        outMessage.put(Message.ENCODING, "US-ASCII");
-        return outMessage;
-    }
-
-    protected void sendAndReceiveMessages(EndpointInfo ei, boolean 
synchronous) throws IOException {
-        inMessage = null;
+    private void sendAndReceiveMessages(EndpointInfo ei, boolean synchronous)
+            throws IOException, InterruptedException {
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduitWithObserver(ei);
         final Message outMessage = createMessage();
@@ -120,9 +79,8 @@ public void onMessage(Message m) {
                 verifyReceivedMessage(m);
                 verifyHeaders(m, outMessage);
                 // setup the message for
-                Conduit backConduit;
                 try {
-                    backConduit = destination.getBackChannel(m);
+                    Conduit backConduit = destination.getBackChannel(m);
                     // wait for the message to be got from the conduit
                     Message replyMessage = new MessageImpl();
                     sendOneWayMessage(backConduit, replyMessage);
@@ -138,13 +96,11 @@ public void onMessage(Message m) {
             // wait for the message to be got from the destination,
             // create the thread to handler the Destination incoming message
 
-            waitForReceiveInMessage();
-            verifyReceivedMessage(inMessage);
+            verifyReceivedMessage(waitForReceiveInMessage());
         } finally {
             conduit.close();
             destination.shutdown();
         }
     }
 
-
-}
\ No newline at end of file
+}
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
index b97a2650801..d9f9e9c8c26 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
@@ -36,10 +36,12 @@
 import org.apache.cxf.transport.jms.util.TestReceiver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
 /**
  * Checks if a CXF client works correlates requests and responses correctly if 
the server sets the message id
  * as correlation id on the response message
@@ -102,20 +104,16 @@ public void sendAndReceive(SyncType syncType, String 
address) throws Exception {
 
         waitForAsyncReply(exchange);
         receiver.close();
-        if (exchange.getInMessage() == null) {
-            throw new RuntimeException("No reply received within 2 seconds");
-        }
+        assertNotNull("No reply received within 2 seconds", 
exchange.getInMessage());
         JMSMessageHeadersType inHeaders = 
(JMSMessageHeadersType)exchange.getInMessage()
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-        Assert.assertEquals(receiver.getRequestMessageId(), 
inHeaders.getJMSCorrelationID());
+        assertEquals(receiver.getRequestMessageId(), 
inHeaders.getJMSCorrelationID());
         conduit.close();
     }
 
-    private void waitForAsyncReply(Exchange exchange) throws 
InterruptedException {
-        int count = 0;
-        while (exchange.getInMessage() == null && count <= 20) {
-            Thread.sleep(100);
-            count++;
+    private static void waitForAsyncReply(Exchange exchange) throws 
InterruptedException {
+        for (int count = 0; exchange.getInMessage() == null && count <= 20; 
count++) {
+            Thread.sleep(100L);
         }
     }
 
diff --git 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index ccdb4501251..937f0b59aa7 100644
--- 
a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ 
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -40,7 +40,6 @@
 import org.awaitility.Awaitility;
 
 import org.easymock.Capture;
-import org.junit.Assert;
 import org.junit.Test;
 
 
@@ -50,6 +49,7 @@
 import static org.easymock.EasyMock.newCapture;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
 
 public class MessageListenerTest {
 
@@ -77,7 +77,7 @@ public void testConnectionProblem() throws JMSException {
         Awaitility.await().until(() -> !container.isRunning());
         verify(exListener);
         JMSException ex = captured.getValue();
-        Assert.assertEquals("The connection is already closed", 
ex.getMessage());
+        assertEquals("The connection is already closed", ex.getMessage());
     }
     
     @Test
@@ -106,7 +106,7 @@ public void testConnectionProblemXA() throws JMSException, 
XAException, Interrup
         verify(exListener);
         JMSException ex = captured.getValue();
         // Closing the pooled connection will result in a NPE when using it
-        Assert.assertEquals("Wrapped exception. null", ex.getMessage());
+        assertEquals("Wrapped exception. null", ex.getMessage());
     }
 
     @Test
@@ -147,14 +147,14 @@ public void testNoTransaction() throws JMSException, 
XAException, InterruptedExc
         container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
         container.start();
 
-        assertNumMessagesInQueue("At the start the queue should be empty", 
connection, dest, 0, 0);
+        assertNumMessagesInQueue("At the start the queue should be empty", 
connection, dest, 0, 0L);
 
         sendMessage(connection, dest, OK);
-        assertNumMessagesInQueue("This message should be committed", 
connection, dest, 0, 1000);
+        assertNumMessagesInQueue("This message should be committed", 
connection, dest, 0, 1000L);
 
         sendMessage(connection, dest, FAIL);
         assertNumMessagesInQueue("Even when an exception occurs the message 
should be committed", connection,
-                                 dest, 0, 1000);
+                                 dest, 0, 1000L);
 
         container.stop();
         connection.close();
@@ -178,17 +178,17 @@ public void testLocalTransaction() throws JMSException, 
XAException, Interrupted
     private void testTransactionalBehaviour(Connection connection, Queue dest) 
throws JMSException,
         InterruptedException {
         Queue dlq = JMSUtil.createQueue(connection, "ActiveMQ.DLQ");
-        assertNumMessagesInQueue("At the start the queue should be empty", 
connection, dest, 0, 0);
-        assertNumMessagesInQueue("At the start the DLQ should be empty", 
connection, dlq, 0, 0);
+        assertNumMessagesInQueue("At the start the queue should be empty", 
connection, dest, 0, 0L);
+        assertNumMessagesInQueue("At the start the DLQ should be empty", 
connection, dlq, 0, 0L);
 
         sendMessage(connection, dest, OK);
-        assertNumMessagesInQueue("This message should be committed", 
connection, dest, 0, 1000);
+        assertNumMessagesInQueue("This message should be committed", 
connection, dest, 0, 1000L);
 
         sendMessage(connection, dest, FAILFIRST);
-        assertNumMessagesInQueue("Should succeed on second try", connection, 
dest, 0, 2000);
+        assertNumMessagesInQueue("Should succeed on second try", connection, 
dest, 0, 2000L);
 
         sendMessage(connection, dest, FAIL);
-        assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 
2500);
+        assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 
2500L);
     }
 
     private Connection createConnection(String name) throws JMSException {
@@ -227,25 +227,27 @@ protected void drainQueue(Connection connection, Queue 
dest) throws JMSException
         }
         consumer.close();
         session.close();
-        assertNumMessagesInQueue("", connection, dest, 0, 0);
+        assertNumMessagesInQueue("", connection, dest, 0, 0L);
     }
 
-    private void assertNumMessagesInQueue(String message, Connection 
connection, Queue queue,
-                                          int expectedNum, int timeout) throws 
JMSException,
+    private static void assertNumMessagesInQueue(String message, Connection 
connection, Queue queue,
+                                          int expectedNum, long timeout) 
throws JMSException,
         InterruptedException {
         long startTime = System.currentTimeMillis();
         int actualNum;
         do {
             actualNum = JMSUtil.getNumMessages(connection, queue);
-
+            if (actualNum == expectedNum) {
+                break;
+            }
             //System.out.println("Messages in queue " + queue.getQueueName() + 
": " + actualNum
             //                   + ", expecting: " + expectedNum);
-            Thread.sleep(100);
+            Thread.sleep(100L);
         } while ((System.currentTimeMillis() - startTime < timeout) && 
expectedNum != actualNum);
-        Assert.assertEquals(message + " -> number of messages on queue", 
expectedNum, actualNum);
+        assertEquals(message + " -> number of messages on queue", expectedNum, 
actualNum);
     }
 
-    private void sendMessage(Connection connection, Destination dest, String 
content) throws JMSException,
+    private static void sendMessage(Connection connection, Destination dest, 
String content) throws JMSException,
         InterruptedException {
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod = session.createProducer(dest);
@@ -253,7 +255,7 @@ private void sendMessage(Connection connection, Destination 
dest, String content
         prod.send(message);
         prod.close();
         session.close();
-        Thread.sleep(500); // Give receiver some time to process
+//        Thread.sleep(500L); // Give receiver some time to process
     }
 
     private static final class TestMessageListener implements MessageListener {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to