Repository: qpid-broker-j
Updated Branches:
  refs/heads/master cd432bcf6 -> fbd973fd4


QPID-6933: [System Tests] Refactor queue producer flow control overflow policy 
tests as JMS 1.1 system test


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/fbd973fd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/fbd973fd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/fbd973fd

Branch: refs/heads/master
Commit: fbd973fd4f62f9edd1edc76175df63dcfd83f80a
Parents: cd432bc
Author: Alex Rudyy <[email protected]>
Authored: Fri Dec 29 19:24:43 2017 +0000
Committer: Alex Rudyy <[email protected]>
Committed: Fri Dec 29 19:24:43 2017 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/systests/JmsTestBase.java   |  57 ++
 .../queue/ProducerFlowControlTest.java          | 582 +++++++++++++++++++
 .../server/queue/ProducerFlowControlTest.java   | 566 ------------------
 test-profiles/CPPExcludes                       |   1 -
 4 files changed, 639 insertions(+), 567 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fbd973fd/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
----------------------------------------------------------------------
diff --git 
a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
 
b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
index 09177f5..a5c9553 100644
--- 
a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
+++ 
b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
@@ -201,6 +201,63 @@ public abstract class JmsTestBase extends 
BrokerAdminUsingTestBase
         return (Map<String, Object>) statistics;
     }
 
+    protected void updateEntityUsingAmqpManagement(final String entityName,
+                                                   final String entityType,
+                                                   final Map<String, Object> 
attributes)
+            throws Exception
+    {
+        Connection connection = getConnection();
+        try
+        {
+            connection.start();
+            Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+            _managementFacade.updateEntityUsingAmqpManagement(entityName, 
session, entityType, attributes);
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    protected void deleteEntityUsingAmqpManagement(final String entityName,
+                                                   final String entityType)
+            throws Exception
+    {
+        Connection connection = getConnection();
+        try
+        {
+            connection.start();
+            Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+            _managementFacade.deleteEntityUsingAmqpManagement(entityName, 
session, entityType);
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    protected Map<String, Object> readEntityUsingAmqpManagement(String type, 
String name, boolean actuals) throws Exception
+    {
+        Connection connection = getConnection();
+        try
+        {
+            connection.start();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            try
+            {
+                return 
_managementFacade.readEntityUsingAmqpManagement(session, type, name, actuals);
+            }
+            finally
+            {
+                session.close();
+            }
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
     protected TopicConnection getTopicConnection() throws JMSException, 
NamingException
     {
         return (TopicConnection) getConnection();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fbd973fd/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git 
a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
new file mode 100644
index 0000000..6f76755
--- /dev/null
+++ 
b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
@@ -0,0 +1,582 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.systests.jms_1_1.extensions.queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.systests.JmsTestBase;
+
+public class ProducerFlowControlTest extends JmsTestBase
+{
+
+    @Test
+    public void testCapacityExceededCausesBlock() throws Exception
+    {
+        String queueName = getTestName();
+        int messageSize = evaluateMessageSize();
+        int capacity = messageSize * 3 + messageSize / 2;
+        int resumeCapacity = messageSize * 2;
+
+        Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, 
capacity, resumeCapacity);
+
+        Connection producerConnection = 
getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = producerSession.createProducer(queue);
+
+            // try to send 5 messages (should block after 4)
+            MessageSender messageSender = sendMessagesAsync(producer, 
producerSession, 5);
+
+            assertTrue("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true, 5000));
+            assertEquals("Incorrect number of message sent before blocking",
+                         4,
+                         messageSender.getNumberOfSentMessages());
+
+            Connection consumerConnection = getConnection();
+            try
+            {
+                Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = 
consumerSession.createConsumer(queue);
+                consumerConnection.start();
+
+                Message message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message);
+
+                assertFalse("Flow is not stopped", 
awaitAttributeValue(queueName, "queueFlowStopped", false, 1000));
+
+                assertEquals("Message incorrectly sent after one message 
received",
+                             4,
+                             messageSender.getNumberOfSentMessages());
+
+                Message message2 = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message2);
+                assertTrue("Message sending is not finished", 
messageSender.getSendLatch()
+                                                                           
.await(1000, TimeUnit.MILLISECONDS));
+                assertEquals("Message not sent after two messages received",
+                             5,
+                             messageSender.getNumberOfSentMessages());
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    @Test
+    public void testFlowControlOnCapacityResumeEqual() throws Exception
+    {
+        String queueName = getTestName();
+        int messageSize = evaluateMessageSize();
+        int capacity = messageSize * 3 + messageSize / 2;
+        Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, 
capacity, capacity);
+
+        Connection producerConnection = 
getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = producerSession.createProducer(queue);
+
+            // try to send 5 messages (should block after 4)
+            MessageSender messageSender = sendMessagesAsync(producer, 
producerSession, 5);
+
+            assertTrue("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true, 5000));
+
+            assertEquals("Incorrect number of message sent before blocking",
+                         4,
+                         messageSender.getNumberOfSentMessages());
+
+            Connection consumerConnection = getConnection();
+            try
+            {
+                Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = 
consumerSession.createConsumer(queue);
+                consumerConnection.start();
+
+                Message message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message);
+
+                assertTrue("Message sending is not finished",
+                           messageSender.getSendLatch().await(1000, 
TimeUnit.MILLISECONDS));
+
+                assertEquals("Message incorrectly sent after one message 
received",
+                             5,
+                             messageSender.getNumberOfSentMessages());
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    @Test
+    public void testFlowControlSoak() throws Exception
+    {
+        final String queueName = getTestName();
+        int messageSize = evaluateMessageSize();
+        int capacity = messageSize * 20;
+        final Queue queue = 
createAndBindQueueWithFlowControlEnabled(queueName, capacity, capacity / 2);
+
+        final int numProducers = 10;
+        final int numMessages = 100;
+
+        Connection consumerConnection = getConnection();
+        try
+        {
+            Connection[] producerConnections = new Connection[numProducers];
+            for (int i = 0; i < numProducers; i++)
+            {
+                producerConnections[i] = getConnection();
+            }
+            try
+            {
+                AtomicInteger messageCounter = new AtomicInteger();
+                for (int i = 0; i < numProducers; i++)
+                {
+                    producerConnections[i].start();
+                    Session session = 
producerConnections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+                    sendMessagesAsync(session.createProducer(queue), session, 
numMessages, messageCounter);
+                }
+
+                Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = 
consumerSession.createConsumer(queue);
+                consumerConnection.start();
+
+                for (int j = 0; j < numProducers * numMessages; j++)
+                {
+
+                    Message msg = consumer.receive(getReceiveTimeout());
+                    assertNotNull("Message not received(" + j + "), sent: " + 
messageCounter.get(), msg);
+                }
+
+                Message msg = consumer.receive(getReceiveTimeout() / 4);
+                assertNull("extra message received", msg);
+            }
+            finally
+            {
+                for (int i = 0; i < numProducers; i++)
+                {
+                    if (producerConnections[i] != null)
+                    {
+                        producerConnections[i].close();
+                    }
+                }
+            }
+        }
+        finally
+        {
+            consumerConnection.close();
+        }
+    }
+
+    @Test
+    public void testFlowControlAttributeModificationViaManagement() throws 
Exception
+    {
+        final String queueName = getTestName();
+        int messageSize = evaluateMessageSize();
+        final Queue queue = 
createAndBindQueueWithFlowControlEnabled(queueName, 0, 0);
+
+        //set new values that will cause flow control to be active, and the 
queue to become overfull after 1 message is sent
+        setFlowLimits(queueName, messageSize / 2, messageSize / 2);
+        assertFalse("Queue should not be overfull", isFlowStopped(queueName));
+
+        Connection producerConnection = 
getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = producerSession.createProducer(queue);
+
+            // try to send 2 messages (should block after 1)
+            final MessageSender sender = sendMessagesAsync(producer, 
producerSession, 2);
+
+            assertTrue("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true, 2000));
+
+            assertEquals("Incorrect number of message sent before blocking", 
1, sender.getNumberOfSentMessages());
+            assertTrue("Queue should be overfull", isFlowStopped(queueName));
+
+            int queueDepthBytes = getQueueDepthBytes(queueName);
+            //raise the attribute values, causing the queue to become 
underfull and allow the second message to be sent.
+            setFlowLimits(queueName, queueDepthBytes * 2 + queueDepthBytes / 
2, queueDepthBytes + queueDepthBytes / 2);
+
+            assertTrue("Flow is stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", false, 2000));
+
+            //check second message was sent
+            assertEquals("Second message was not sent after lifting 
FlowResumeCapacity",
+                         2,
+                         sender.getNumberOfSentMessages());
+            assertFalse("Queue should not be overfull", 
isFlowStopped(queueName));
+
+            // try to send another message to block flow
+            final MessageSender sender2 = sendMessagesAsync(producer, 
producerSession, 1);
+
+            assertTrue("Flow is stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true, 2000));
+            assertEquals("Incorrect number of message sent before blocking", 
1, sender2.getNumberOfSentMessages());
+            assertTrue("Queue should be overfull", isFlowStopped(queueName));
+
+            Connection consumerConnection = getConnection();
+            try
+            {
+                Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = 
consumerSession.createConsumer(queue);
+                consumerConnection.start();
+
+                Message message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message);
+
+                message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Second message is not received", message);
+
+                assertTrue("Flow is stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", false, 2000));
+
+                assertNotNull("Should have received second message", 
consumer.receive(getReceiveTimeout()));
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    @Test
+    public void testProducerFlowControlIsTriggeredOnEnqueue() throws Exception
+    {
+        final String queueName = getTestName();
+        final Queue queue = 
createAndBindQueueWithFlowControlEnabled(queueName, 1, 0);
+
+        Connection producerConnection = 
getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = producerSession.createProducer(queue);
+
+            producer.send(nextMessage(0, producerSession));
+
+            // try to send 2 messages (should block after 1)
+            sendMessagesAsync(producer, producerSession, 2);
+
+            assertTrue("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true, 2000));
+
+            Connection consumerConnection = getConnection();
+            try
+            {
+                Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = 
consumerSession.createConsumer(queue);
+                consumerConnection.start();
+
+                Message message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message);
+
+                message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Second message is not received", message);
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    @Test
+    public void testQueueDeleteWithBlockedFlow() throws Exception
+    {
+        final String queueName = getTestName();
+        final int messageSize = evaluateMessageSize();
+        final int capacity = messageSize * 3 + messageSize / 2;
+        final int resumeCapacity = messageSize * 2;
+
+        final Queue queue = 
createAndBindQueueWithFlowControlEnabled(queueName, capacity, resumeCapacity);
+
+        final Connection producerConnection = 
getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            final Session producerSession = 
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
producerSession.createProducer(queue);
+
+            // try to send 5 messages (should block after 4)
+            final MessageSender sender = sendMessagesAsync(producer, 
producerSession, 5);
+
+            assertTrue("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true, 5000));
+
+            assertEquals("Incorrect number of message sent before blocking", 
4, sender.getNumberOfSentMessages());
+
+            deleteEntityUsingAmqpManagement(queueName, 
"org.apache.qpid.Queue");
+
+            createQueue(queueName);
+
+            assertEquals("Unexpected queue depth", 0, 
getQueueDepthBytes(queueName));
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+    private int getQueueDepthBytes(final String queueName) throws Exception
+    {
+        Map<String, Object> arguments =
+                Collections.singletonMap("statistics", 
Collections.singletonList("queueDepthBytes"));
+        Object statistics = performOperationUsingAmqpManagement(queueName,
+                                                                
"getStatistics",
+                                                                
"org.apache.qpid.Queue",
+                                                                arguments);
+        assertNotNull("Statistics is null", statistics);
+        assertTrue("Statistics is not map", statistics instanceof Map);
+        @SuppressWarnings("unchecked")
+        Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
+        assertTrue("queueDepthBytes is not present", 
statisticsMap.get("queueDepthBytes") instanceof Number);
+        return ((Number) statisticsMap.get("queueDepthBytes")).intValue();
+    }
+
+    private void setFlowLimits(final String queueName, final int blockValue, 
final int resumeValue) throws Exception
+    {
+        final Map<String, Object> attributes = new HashMap<>();
+        
attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 
blockValue);
+        attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, 
OverflowPolicy.PRODUCER_FLOW_CONTROL.name());
+        String resumeLimit = getFlowResumeLimit(blockValue, resumeValue);
+        String context = String.format("{\"%s\": %s}",
+                                       
org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
+                                       resumeLimit);
+        attributes.put(org.apache.qpid.server.model.Queue.CONTEXT, context);
+        updateEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", 
attributes);
+    }
+
+    private String getFlowResumeLimit(final double maximumCapacity, final 
double resumeCapacity)
+    {
+        double ratio = resumeCapacity / maximumCapacity;
+        return String.format("%.2f", ratio * 100.0);
+    }
+
+    private boolean isFlowStopped(final String queueName) throws Exception
+    {
+        Map<String, Object> attributes = 
readEntityUsingAmqpManagement("org.apache.qpid.Queue", queueName, false);
+        return Boolean.TRUE.equals(attributes.get("queueFlowStopped"));
+    }
+
+
+    private Queue createAndBindQueueWithFlowControlEnabled(String queueName,
+                                                           int capacity,
+                                                           int resumeCapacity) 
throws Exception
+    {
+
+        final Map<String, Object> attributes = new HashMap<>();
+        if (capacity != 0)
+        {
+            String flowResumeLimit = getFlowResumeLimit(capacity, 
resumeCapacity);
+            attributes.put(org.apache.qpid.server.model.Queue.CONTEXT,
+                           String.format("{\"%s\": %s}",
+                                         
org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
+                                         flowResumeLimit));
+        }
+        
attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 
capacity);
+        attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, 
OverflowPolicy.PRODUCER_FLOW_CONTROL.name());
+        createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", 
attributes);
+        return createQueue(queueName);
+    }
+
+    private MessageSender sendMessagesAsync(final MessageProducer producer,
+                                            final Session producerSession,
+                                            final int numMessages)
+    {
+        return sendMessagesAsync(producer, producerSession, numMessages, null);
+    }
+
+    private MessageSender sendMessagesAsync(final MessageProducer producer,
+                                            final Session producerSession,
+                                            final int numMessages,
+                                            final AtomicInteger messageCounter)
+    {
+        MessageSender sender = new MessageSender(producer, producerSession, 
numMessages, messageCounter);
+        new Thread(sender).start();
+        return sender;
+    }
+
+    private final byte[] BYTE_300 = new byte[300];
+
+    private Message nextMessage(int msg, Session producerSession) throws 
JMSException
+    {
+        BytesMessage send = producerSession.createBytesMessage();
+        send.writeBytes(BYTE_300);
+        send.setIntProperty("msg", msg);
+        return send;
+    }
+
+    private boolean awaitAttributeValue(String queueName, String 
attributeName, Object expectedValue, long timeout)
+            throws Exception
+    {
+        long startTime = System.currentTimeMillis();
+        long endTime = startTime + timeout;
+        boolean found = false;
+        do
+        {
+            Map<String, Object> attributes = 
readEntityUsingAmqpManagement("org.apache.qpid.Queue", queueName, false);
+            Object actualValue = attributes.get(attributeName);
+            if (expectedValue == null)
+            {
+                found = actualValue == null;
+            }
+            else if (actualValue != null)
+            {
+                if (actualValue.getClass() == expectedValue.getClass())
+                {
+                    found = expectedValue.equals(actualValue);
+                }
+                else
+                {
+                    found = 
String.valueOf(expectedValue).equals(String.valueOf(actualValue));
+                }
+            }
+
+            if (!found)
+            {
+                Thread.sleep(50);
+            }
+        } while (!found && System.currentTimeMillis() <= endTime);
+        return found;
+    }
+
+    private int evaluateMessageSize() throws Exception
+    {
+        String tmpQueueName = getTestName() + "_Tmp";
+        Queue tmpQueue = createQueue(tmpQueueName);
+        final Connection connection = getConnection();
+        try
+        {
+            connection.start();
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            MessageProducer tmpQueueProducer = 
session.createProducer(tmpQueue);
+            tmpQueueProducer.send(nextMessage(0, session));
+            session.commit();
+            return getQueueDepthBytes(tmpQueueName);
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    private class MessageSender implements Runnable
+    {
+        private final AtomicInteger _sentMessages;
+        private final MessageProducer _senderProducer;
+        private final Session _senderSession;
+        private final int _numMessages;
+        private volatile Exception _exception;
+        private CountDownLatch _sendLatch = new CountDownLatch(1);
+
+        MessageSender(MessageProducer producer, Session producerSession, int 
numMessages, AtomicInteger messageCounter)
+        {
+            _senderProducer = producer;
+            _senderSession = producerSession;
+            _numMessages = numMessages;
+            _sentMessages = messageCounter == null ? new AtomicInteger(0) : 
messageCounter;
+        }
+
+        @Override
+        public void run()
+        {
+            try
+            {
+                sendMessages(_senderProducer, _senderSession, _numMessages);
+            }
+            catch (Exception e)
+            {
+                _exception = e;
+            }
+            finally
+            {
+                _sendLatch.countDown();
+            }
+        }
+
+        CountDownLatch getSendLatch()
+        {
+            return _sendLatch;
+        }
+
+        int getNumberOfSentMessages()
+        {
+            return _sentMessages.get();
+        }
+
+        Exception getException()
+        {
+            return _exception;
+        }
+
+        private void sendMessages(MessageProducer producer, Session 
producerSession, int numMessages)
+                throws JMSException
+        {
+
+            for (int msg = 0; msg < numMessages; msg++)
+            {
+                producer.send(nextMessage(msg, producerSession));
+                _sentMessages.incrementAndGet();
+
+                // Cause work that causes a synchronous interaction on the 
wire.  We need to be
+                // sure that the client has received the flow/message.stop etc.
+                producerSession.createTemporaryQueue().delete();
+            }
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fbd973fd/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git 
a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
 
b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
deleted file mode 100644
index 062479c..0000000
--- 
a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ /dev/null
@@ -1,566 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.queue;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.server.logging.AbstractTestLogging;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.systest.rest.RestTestHelper;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
-
-public class ProducerFlowControlTest extends AbstractTestLogging
-{
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProducerFlowControlTest.class);
-
-    private Connection _producerConnection;
-    private Connection _consumerConnection;
-    private Session _producerSession;
-    private Session _consumerSession;
-    private MessageProducer _producer;
-    private MessageConsumer _consumer;
-    private Queue _queue;
-    private RestTestHelper _restTestHelper;
-
-    private final AtomicInteger _sentMessages = new AtomicInteger(0);
-    private int _messageSizeIncludingHeader;
-    private Session _utilitySession;
-
-    @Override
-    public void setUp() throws Exception
-    {
-        getDefaultBrokerConfiguration().addHttpManagementConfiguration();
-        super.setUp();
-    }
-
-    @Override
-    public void startDefaultBroker()
-    {
-        // broker start-up is delegated to the tests
-    }
-
-    private void init() throws Exception
-    {
-        super.startDefaultBroker();
-        _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
-        _monitor.markDiscardPoint();
-
-        if (!isBroker10())
-        {
-            setSystemProperty("sync_publish", "all");
-        }
-
-        _producerConnection = getConnection();
-        _producerSession = _producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        _producerConnection.start();
-
-        _consumerConnection = getConnection();
-        _consumerSession = _consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        final Connection utilityConnection = getConnection();
-        utilityConnection.start();
-        _utilitySession = utilityConnection.createSession(true, 
Session.SESSION_TRANSACTED);
-        String tmpQueueName = getTestQueueName() + "_Tmp";
-        Queue tmpQueue = createTestQueue(_utilitySession, tmpQueueName);
-        MessageProducer tmpQueueProducer= 
_utilitySession.createProducer(tmpQueue);
-        tmpQueueProducer.send(nextMessage(0, _utilitySession));
-        _utilitySession.commit();
-
-        _messageSizeIncludingHeader = getQueueDepthBytes(tmpQueueName);
-    }
-
-    public void testCapacityExceededCausesBlock() throws Exception
-    {
-        init();
-        String queueName = getTestQueueName();
-
-        int capacity = _messageSizeIncludingHeader * 3 + 
_messageSizeIncludingHeader / 2;
-        int resumeCapacity = _messageSizeIncludingHeader * 2;
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 
capacity, resumeCapacity);
-        _producer = _producerSession.createProducer(_queue);
-
-        // try to send 5 messages (should block after 4)
-        CountDownLatch sendLatch = sendMessagesAsync(_producer, 
_producerSession, 5).getSendLatch();
-
-        assertTrue("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true, 5000));
-        assertEquals("Incorrect number of message sent before blocking", 4, 
_sentMessages.get());
-
-        _consumer = _consumerSession.createConsumer(_queue);
-        _consumerConnection.start();
-
-        Message message = _consumer.receive(RECEIVE_TIMEOUT);
-        assertNotNull("Message is not received", message);
-
-        assertFalse("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", false, 1000));
-
-        assertEquals("Message incorrectly sent after one message received", 4, 
_sentMessages.get());
-
-        Message message2 = _consumer.receive(RECEIVE_TIMEOUT);
-        assertNotNull("Message is not received", message2);
-        assertTrue("Message sending is not finished", sendLatch.await(1000, 
TimeUnit.MILLISECONDS));
-        assertEquals("Message not sent after two messages received", 5, 
_sentMessages.get());
-    }
-
-
-    public void testBrokerLogMessages() throws Exception
-    {
-        init();
-        String queueName = getTestQueueName();
-
-        int capacity = _messageSizeIncludingHeader * 3 + 
_messageSizeIncludingHeader / 2;
-        int resumeCapacity = _messageSizeIncludingHeader * 2;
-
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 
capacity, resumeCapacity);
-        _producer = _producerSession.createProducer(_queue);
-
-        // try to send 5 messages (should block after 4)
-        sendMessagesAsync(_producer, _producerSession, 5);
-
-        List<String> results = waitAndFindMatches("QUE-1003", 7000);
-
-        assertEquals("Did not find correct number of QUE-1003 queue overfull 
messages", 1, results.size());
-
-        _consumer = _consumerSession.createConsumer(_queue);
-        _consumerConnection.start();
-
-
-        while(_consumer.receive(1000) != null) {};
-
-        results = waitAndFindMatches("QUE-1004");
-
-        assertEquals("Did not find correct number of UNDERFULL queue underfull 
messages", 1, results.size());
-    }
-
-    public void testFlowControlOnCapacityResumeEqual() throws Exception
-    {
-        init();
-        String queueName = getTestQueueName();
-
-        int capacity = _messageSizeIncludingHeader * 3 + 
_messageSizeIncludingHeader / 2;
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName,
-                                                 capacity,
-                                                 capacity);
-        _producer = _producerSession.createProducer(_queue);
-
-
-        // try to send 5 messages (should block after 4)
-        CountDownLatch sendLatch = sendMessagesAsync(_producer, 
_producerSession, 5).getSendLatch();
-
-        assertTrue("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true,5000));
-
-        assertEquals("Incorrect number of message sent before blocking", 4, 
_sentMessages.get());
-
-        _consumer = _consumerSession.createConsumer(_queue);
-        _consumerConnection.start();
-
-        Message message = _consumer.receive(RECEIVE_TIMEOUT);
-        assertNotNull("Message is not received", message);
-
-        assertTrue("Message sending is not finished", sendLatch.await(1000, 
TimeUnit.MILLISECONDS));
-
-        assertEquals("Message incorrectly sent after one message received", 5, 
_sentMessages.get());
-        
-
-    }
-
-
-    public void testFlowControlSoak() throws Exception
-    {
-        init();
-        String queueName = getTestQueueName();
-        
-
-        final int numProducers = 10;
-        final int numMessages = 100;
-
-        final int capacity = _messageSizeIncludingHeader * 20;
-
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 
capacity, capacity/2);
-
-        _consumerConnection.start();
-
-        Connection[] producers = new Connection[numProducers];
-        for(int i = 0 ; i < numProducers; i ++)
-        {
-
-            producers[i] = getConnection();
-            producers[i].start();
-            Session session = producers[i].createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-            MessageProducer myproducer = session.createProducer(_queue);
-            MessageSender sender = sendMessagesAsync(myproducer, session, 
numMessages);
-        }
-
-        _consumer = _consumerSession.createConsumer(_queue);
-        _consumerConnection.start();
-
-        for(int j = 0; j < numProducers * numMessages; j++)
-        {
-        
-            Message msg = _consumer.receive(5000);
-            assertNotNull("Message not received("+j+"), sent: 
"+_sentMessages.get(), msg);
-
-        }
-
-
-
-        Message msg = _consumer.receive(500);
-        assertNull("extra message received", msg);
-
-
-        for(int i = 0; i < numProducers; i++)
-        {
-            producers[i].close();
-        }
-
-    }
-
-    public void testFlowControlAttributeModificationViaREST() throws Exception
-    {
-        init();
-        String queueName = getTestQueueName();
-
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 
0, 0);
-        _producer = _producerSession.createProducer(_queue);
-        
-        String queueUrl = String.format("queue/%1$s/%1$s/%2$s", 
TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
-
-        //check current attribute values are 0 as expected
-        Map<String, Object> queueAttributes = 
_restTestHelper.getJsonAsMap(queueUrl);
-        assertEquals("Capacity was not the expected value", 0,
-                     ((Number) 
queueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES)).intValue());
-
-        //set new values that will cause flow control to be active, and the 
queue to become overfull after 1 message is sent
-        setFlowLimits(queueUrl, 250, 250);
-        assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
-
-        // try to send 2 messages (should block after 1)
-        sendMessagesAsync(_producer, _producerSession, 2);
-
-        waitForFlowControlAndMessageCount(queueUrl, 1, 2000);
-
-        //check only 1 message was sent, and queue is overfull
-        assertEquals("Incorrect number of message sent before blocking", 1, 
_sentMessages.get());
-        assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
-
-        int queueDepthBytes = getQueueDepthBytes(queueName);
-        //raise the attribute values, causing the queue to become underfull 
and allow the second message to be sent.
-        setFlowLimits(queueUrl, queueDepthBytes + 200, queueDepthBytes);
-
-        waitForFlowControlAndMessageCount(queueUrl, 2, 2000);
-
-        //check second message was sent, and caused the queue to become 
overfull again
-        assertEquals("Second message was not sent after lifting 
FlowResumeCapacity", 2, _sentMessages.get());
-        assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
-
-        //raise capacity above queue depth, check queue remains overfull as 
FlowResumeCapacity still exceeded
-        setFlowLimits(queueUrl, 2 * queueDepthBytes + 100, queueDepthBytes);
-        assertTrue("Queue should be overfull", isFlowStopped(queueUrl));
-
-        //receive a message, check queue becomes underfull
-        
-        _consumer = _consumerSession.createConsumer(_queue);
-        _consumerConnection.start();
-        
-        assertNotNull("Should have received first message", 
_consumer.receive(RECEIVE_TIMEOUT));
-
-        if(!isBroker10())
-        {
-            //perform a synchronous op on the connection
-            ((AMQSession<?, ?>) _consumerSession).sync();
-        }
-
-        _restTestHelper.waitForAttributeChanged(queueUrl, 
org.apache.qpid.server.model.Queue.QUEUE_FLOW_STOPPED, false);
-
-        assertNotNull("Should have received second message", 
_consumer.receive(RECEIVE_TIMEOUT));
-    }
-
-    public void testProducerFlowControlIsTriggeredOnEnqueue() throws Exception
-    {
-        long oneHourMilliseconds = 60 * 60 * 1000L;
-        setSystemProperty("virtualhost.housekeepingCheckPeriod", 
String.valueOf(oneHourMilliseconds));
-        super.startDefaultBroker();
-        _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
-
-        Connection connection = getConnection();
-        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-
-        String queueName = getTestQueueName();
-        createAndBindQueueWithFlowControlEnabled(session, queueName, 1, 0, 
true, false);
-
-        sendMessage(session, _queue, 1);
-
-        String queueUrl = String.format("queue/%1$s/%1$s/%2$s", 
TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
-        waitForFlowControlAndMessageCount(queueUrl, 1, 2000);
-
-        assertTrue("Message flow is not stopped", isFlowStopped(queueUrl));
-    }
-
-    private int getQueueDepthBytes(final String queueName) throws IOException
-    {
-        // On AMQP 1.0 the size of the message on the broker is not 
necessarily the size of the message we sent. Therefore, get the actual size 
from the broker
-        final String requestUrl = 
String.format("queue/%1$s/%1$s/%2$s/getStatistics?statistics=[\"queueDepthBytes\"]",
 TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
-        final Map<String, Object> queueAttributes = 
_restTestHelper.getJsonAsMap(requestUrl);
-        return ((Number) queueAttributes.get("queueDepthBytes")).intValue();
-    }
-
-    private void waitForFlowControlAndMessageCount(final String queueUrl, 
final int messageCount, final int timeout) throws InterruptedException, 
IOException
-    {
-        int timeWaited = 0;
-        while (timeWaited < timeout && (!isFlowStopped(queueUrl) || 
_sentMessages.get() != messageCount))
-        {
-            Thread.sleep(50);
-            timeWaited += 50;
-        }
-    }
-
-    private void setFlowLimits(final String queueUrl, final int blockValue, 
final int resumeValue) throws IOException
-    {
-        final Map<String, Object> attributes = new HashMap<>();
-        
attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 
blockValue);
-        attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, 
OverflowPolicy.PRODUCER_FLOW_CONTROL);
-        String resumeLimit = getFlowResumeLimit(blockValue, resumeValue);
-        Map<String, String> context = 
Collections.singletonMap(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
 resumeLimit);
-        attributes.put(org.apache.qpid.server.model.Queue.CONTEXT, context);
-        _restTestHelper.submitRequest(queueUrl, "PUT", attributes);
-    }
-
-    private String getFlowResumeLimit(final double blockValue, final double 
resumeValue)
-    {
-        return String.format("%.2f", resumeValue / blockValue * 100.0);
-    }
-
-    private boolean isFlowStopped(final String queueUrl) throws IOException
-    {
-        Map<String, Object> queueAttributes2 = 
_restTestHelper.getJsonAsMap(queueUrl);
-        return (boolean) 
queueAttributes2.get(org.apache.qpid.server.model.Queue.QUEUE_FLOW_STOPPED);
-    }
-
-    public void testQueueDeleteWithBlockedFlow() throws Exception
-    {
-        init();
-        String queueName = getTestQueueName();
-        int capacity = _messageSizeIncludingHeader * 3 + 
_messageSizeIncludingHeader / 2;
-        int resumeCapacity = _messageSizeIncludingHeader * 2;
-        createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 
capacity, resumeCapacity, true, false);
-
-        _producer = _producerSession.createProducer(_queue);
-
-        // try to send 5 messages (should block after 4)
-        sendMessagesAsync(_producer, _producerSession, 5);
-
-        assertTrue("Flow is not stopped", awaitAttributeValue(queueName, 
"queueFlowStopped", true,5000));
-
-        assertEquals("Incorrect number of message sent before blocking", 4, 
_sentMessages.get());
-
-        if(!isBroker10())
-        {
-            // delete queue with a consumer session
-            ((AMQSession<?, ?>) _utilitySession).sendQueueDelete(queueName);
-        }
-        else
-        {
-            deleteEntityUsingAmqpManagement(getTestQueueName(), 
_utilitySession, "org.apache.qpid.Queue");
-            createTestQueue(_utilitySession);
-        }
-        _consumer = _consumerSession.createConsumer(_queue);
-        _consumerConnection.start();
-
-        Message message = _consumer.receive(1000l);
-        assertNull("Unexpected message", message);
-    }
-
-    private void createAndBindQueueWithFlowControlEnabled(Session session, 
String queueName, int capacity, int resumeCapacity) throws Exception
-    {
-        createAndBindQueueWithFlowControlEnabled(session, queueName, capacity, 
resumeCapacity, false, true);
-    }
-
-    private void createAndBindQueueWithFlowControlEnabled(Session session, 
String queueName, int capacity, int resumeCapacity, boolean durable, boolean 
autoDelete) throws Exception
-    {
-        if(isBroker10())
-        {
-            final Map<String, Object> attributes = new HashMap<>();
-            if (capacity != 0)
-            {
-                attributes.put(org.apache.qpid.server.model.Queue.CONTEXT,
-                               
Collections.singletonMap(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
-                                                        
getFlowResumeLimit(capacity, resumeCapacity)));
-            }
-            
attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 
capacity);
-            attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, 
OverflowPolicy.PRODUCER_FLOW_CONTROL);
-            attributes.put(org.apache.qpid.server.model.Queue.DURABLE, 
durable);
-            attributes.put(ConfiguredObject.LIFETIME_POLICY, autoDelete ? 
LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name() : 
LifetimePolicy.PERMANENT.name());
-            String queueUrl = String.format("queue/%1$s/%1$s/%2$s", 
TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
-            _restTestHelper.submitRequest(queueUrl, "PUT", attributes, 201);
-            _queue = session.createQueue(queueName);
-        }
-        else
-        {
-            final Map<String, Object> arguments = new HashMap<String, 
Object>();
-            arguments.put("x-qpid-capacity", capacity);
-            arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
-            ((AMQSession<?, ?>) session).createQueue(queueName, autoDelete, 
durable, false, arguments);
-            _queue = session.createQueue("direct://amq.direct/"
-                                         + queueName
-                                         + "/"
-                                         + queueName
-                                         + "?durable='"
-                                         + durable
-                                         + "'&autodelete='"
-                                         + autoDelete
-                                         + "'");
-            ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) 
_queue);
-        }
-    }
-
-    private MessageSender sendMessagesAsync(final MessageProducer producer,
-                                            final Session producerSession,
-                                            final int numMessages)
-    {
-        MessageSender sender = new MessageSender(producer, producerSession, 
numMessages);
-        new Thread(sender).start();
-        return sender;
-    }
-
-
-    private class MessageSender implements Runnable
-    {
-        private final MessageProducer _senderProducer;
-        private final Session _senderSession;
-        private final int _numMessages;
-        private volatile JMSException _exception;
-        private CountDownLatch _sendLatch = new CountDownLatch(1);
-
-        public MessageSender(MessageProducer producer, Session 
producerSession, int numMessages)
-        {
-            _senderProducer = producer;
-            _senderSession = producerSession;
-            _numMessages = numMessages;
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                sendMessages(_senderProducer, _senderSession, _numMessages);
-            }
-            catch (JMSException e)
-            {
-                _exception = e;
-            }
-            finally
-            {
-                _sendLatch.countDown();
-            }
-        }
-
-        public CountDownLatch getSendLatch()
-        {
-            return _sendLatch;
-        }
-
-        private void sendMessages(MessageProducer producer, Session 
producerSession, int numMessages)
-                throws JMSException
-        {
-
-            for (int msg = 0; msg < numMessages; msg++)
-            {
-                producer.send(nextMessage(msg, producerSession));
-                _sentMessages.incrementAndGet();
-
-                // Cause work that causes a synchronous interaction on the 
wire.  We need to be
-                // sure that the client has received the flow/message.stop etc.
-                producerSession.createTemporaryQueue().delete();
-            }
-        }
-
-    }
-
-    private final byte[] BYTE_300 = new byte[300];
-
-    private Message nextMessage(int msg, Session producerSession) throws 
JMSException
-    {
-        BytesMessage send = producerSession.createBytesMessage();
-        send.writeBytes(BYTE_300);
-        send.setIntProperty("msg", msg);
-        return send;
-    }
-
-    private boolean awaitAttributeValue(String queueName, String 
attributeName, Object expectedValue, long timeout)
-            throws JMSException, InterruptedException
-    {
-        long startTime = System.currentTimeMillis();
-        long endTime = startTime + timeout;
-        boolean found = false;
-        do
-        {
-            Map<String, Object> attributes =
-                    managementReadObject(_utilitySession, 
"org.apache.qpid.SortedQueue", queueName, false);
-            Object actualValue = attributes.get(attributeName);
-            if (expectedValue == null)
-            {
-                found = actualValue == null;
-            }
-            else if (actualValue != null)
-            {
-                if (actualValue.getClass() == expectedValue.getClass())
-                {
-                    found = expectedValue.equals(actualValue);
-                }
-                else
-                {
-                    found = 
String.valueOf(expectedValue).equals(String.valueOf(actualValue));
-                }
-            }
-
-            if (!found)
-            {
-                Thread.sleep(50);
-            }
-        } while (!found && System.currentTimeMillis() <= endTime);
-        return found;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fbd973fd/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 1842076..c1a59a9 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -79,7 +79,6 @@ org.apache.qpid.server.queue.LiveQueueOperationsTest#*
 org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
 
 //QPID-942 : Implemented Channel.Flow based Producer Side flow control to the 
Qpid Broker-J (not in CPP Broker)
-org.apache.qpid.server.queue.ProducerFlowControlTest#*
 org.apache.qpid.test.client.ProducerFlowControlTest#*
 
 //QPID-3986 : Flow control invoked on total store disk usage


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to