Repository: qpid-broker-j Updated Branches: refs/heads/master cd432bcf6 -> fbd973fd4
QPID-6933: [System Tests] Refactor queue producer flow control overflow policy tests as JMS 1.1 system test Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/fbd973fd Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/fbd973fd Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/fbd973fd Branch: refs/heads/master Commit: fbd973fd4f62f9edd1edc76175df63dcfd83f80a Parents: cd432bc Author: Alex Rudyy <[email protected]> Authored: Fri Dec 29 19:24:43 2017 +0000 Committer: Alex Rudyy <[email protected]> Committed: Fri Dec 29 19:24:43 2017 +0000 ---------------------------------------------------------------------- .../org/apache/qpid/systests/JmsTestBase.java | 57 ++ .../queue/ProducerFlowControlTest.java | 582 +++++++++++++++++++ .../server/queue/ProducerFlowControlTest.java | 566 ------------------ test-profiles/CPPExcludes | 1 - 4 files changed, 639 insertions(+), 567 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fbd973fd/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java index 09177f5..a5c9553 100644 --- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java +++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java @@ -201,6 +201,63 @@ public abstract class JmsTestBase extends BrokerAdminUsingTestBase return (Map<String, Object>) statistics; } + protected void updateEntityUsingAmqpManagement(final String entityName, + final String entityType, + final Map<String, Object> attributes) + throws Exception + { + Connection connection = getConnection(); + try + { + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _managementFacade.updateEntityUsingAmqpManagement(entityName, session, entityType, attributes); + } + finally + { + connection.close(); + } + } + + protected void deleteEntityUsingAmqpManagement(final String entityName, + final String entityType) + throws Exception + { + Connection connection = getConnection(); + try + { + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _managementFacade.deleteEntityUsingAmqpManagement(entityName, session, entityType); + } + finally + { + connection.close(); + } + } + + protected Map<String, Object> readEntityUsingAmqpManagement(String type, String name, boolean actuals) throws Exception + { + Connection connection = getConnection(); + try + { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try + { + return _managementFacade.readEntityUsingAmqpManagement(session, type, name, actuals); + } + finally + { + session.close(); + } + } + finally + { + connection.close(); + } + } + protected TopicConnection getTopicConnection() throws JMSException, NamingException { return (TopicConnection) getConnection(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fbd973fd/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java new file mode 100644 index 0000000..6f76755 --- /dev/null +++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java @@ -0,0 +1,582 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.systests.jms_1_1.extensions.queue; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.junit.Test; + +import org.apache.qpid.server.model.OverflowPolicy; +import org.apache.qpid.systests.JmsTestBase; + +public class ProducerFlowControlTest extends JmsTestBase +{ + + @Test + public void testCapacityExceededCausesBlock() throws Exception + { + String queueName = getTestName(); + int messageSize = evaluateMessageSize(); + int capacity = messageSize * 3 + messageSize / 2; + int resumeCapacity = messageSize * 2; + + Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, capacity, resumeCapacity); + + Connection producerConnection = getConnectionBuilder().setSyncPublish(true).build(); + try + { + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + // try to send 5 messages (should block after 4) + MessageSender messageSender = sendMessagesAsync(producer, producerSession, 5); + + assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000)); + assertEquals("Incorrect number of message sent before blocking", + 4, + messageSender.getNumberOfSentMessages()); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message message = consumer.receive(getReceiveTimeout()); + assertNotNull("Message is not received", message); + + assertFalse("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", false, 1000)); + + assertEquals("Message incorrectly sent after one message received", + 4, + messageSender.getNumberOfSentMessages()); + + Message message2 = consumer.receive(getReceiveTimeout()); + assertNotNull("Message is not received", message2); + assertTrue("Message sending is not finished", messageSender.getSendLatch() + .await(1000, TimeUnit.MILLISECONDS)); + assertEquals("Message not sent after two messages received", + 5, + messageSender.getNumberOfSentMessages()); + } + finally + { + consumerConnection.close(); + } + } + finally + { + producerConnection.close(); + } + } + + @Test + public void testFlowControlOnCapacityResumeEqual() throws Exception + { + String queueName = getTestName(); + int messageSize = evaluateMessageSize(); + int capacity = messageSize * 3 + messageSize / 2; + Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, capacity, capacity); + + Connection producerConnection = getConnectionBuilder().setSyncPublish(true).build(); + try + { + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + // try to send 5 messages (should block after 4) + MessageSender messageSender = sendMessagesAsync(producer, producerSession, 5); + + assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000)); + + assertEquals("Incorrect number of message sent before blocking", + 4, + messageSender.getNumberOfSentMessages()); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message message = consumer.receive(getReceiveTimeout()); + assertNotNull("Message is not received", message); + + assertTrue("Message sending is not finished", + messageSender.getSendLatch().await(1000, TimeUnit.MILLISECONDS)); + + assertEquals("Message incorrectly sent after one message received", + 5, + messageSender.getNumberOfSentMessages()); + } + finally + { + consumerConnection.close(); + } + } + finally + { + producerConnection.close(); + } + } + + @Test + public void testFlowControlSoak() throws Exception + { + final String queueName = getTestName(); + int messageSize = evaluateMessageSize(); + int capacity = messageSize * 20; + final Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, capacity, capacity / 2); + + final int numProducers = 10; + final int numMessages = 100; + + Connection consumerConnection = getConnection(); + try + { + Connection[] producerConnections = new Connection[numProducers]; + for (int i = 0; i < numProducers; i++) + { + producerConnections[i] = getConnection(); + } + try + { + AtomicInteger messageCounter = new AtomicInteger(); + for (int i = 0; i < numProducers; i++) + { + producerConnections[i].start(); + Session session = producerConnections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + sendMessagesAsync(session.createProducer(queue), session, numMessages, messageCounter); + } + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + for (int j = 0; j < numProducers * numMessages; j++) + { + + Message msg = consumer.receive(getReceiveTimeout()); + assertNotNull("Message not received(" + j + "), sent: " + messageCounter.get(), msg); + } + + Message msg = consumer.receive(getReceiveTimeout() / 4); + assertNull("extra message received", msg); + } + finally + { + for (int i = 0; i < numProducers; i++) + { + if (producerConnections[i] != null) + { + producerConnections[i].close(); + } + } + } + } + finally + { + consumerConnection.close(); + } + } + + @Test + public void testFlowControlAttributeModificationViaManagement() throws Exception + { + final String queueName = getTestName(); + int messageSize = evaluateMessageSize(); + final Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, 0, 0); + + //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent + setFlowLimits(queueName, messageSize / 2, messageSize / 2); + assertFalse("Queue should not be overfull", isFlowStopped(queueName)); + + Connection producerConnection = getConnectionBuilder().setSyncPublish(true).build(); + try + { + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + // try to send 2 messages (should block after 1) + final MessageSender sender = sendMessagesAsync(producer, producerSession, 2); + + assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 2000)); + + assertEquals("Incorrect number of message sent before blocking", 1, sender.getNumberOfSentMessages()); + assertTrue("Queue should be overfull", isFlowStopped(queueName)); + + int queueDepthBytes = getQueueDepthBytes(queueName); + //raise the attribute values, causing the queue to become underfull and allow the second message to be sent. + setFlowLimits(queueName, queueDepthBytes * 2 + queueDepthBytes / 2, queueDepthBytes + queueDepthBytes / 2); + + assertTrue("Flow is stopped", awaitAttributeValue(queueName, "queueFlowStopped", false, 2000)); + + //check second message was sent + assertEquals("Second message was not sent after lifting FlowResumeCapacity", + 2, + sender.getNumberOfSentMessages()); + assertFalse("Queue should not be overfull", isFlowStopped(queueName)); + + // try to send another message to block flow + final MessageSender sender2 = sendMessagesAsync(producer, producerSession, 1); + + assertTrue("Flow is stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 2000)); + assertEquals("Incorrect number of message sent before blocking", 1, sender2.getNumberOfSentMessages()); + assertTrue("Queue should be overfull", isFlowStopped(queueName)); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message message = consumer.receive(getReceiveTimeout()); + assertNotNull("Message is not received", message); + + message = consumer.receive(getReceiveTimeout()); + assertNotNull("Second message is not received", message); + + assertTrue("Flow is stopped", awaitAttributeValue(queueName, "queueFlowStopped", false, 2000)); + + assertNotNull("Should have received second message", consumer.receive(getReceiveTimeout())); + } + finally + { + consumerConnection.close(); + } + } + finally + { + producerConnection.close(); + } + } + + @Test + public void testProducerFlowControlIsTriggeredOnEnqueue() throws Exception + { + final String queueName = getTestName(); + final Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, 1, 0); + + Connection producerConnection = getConnectionBuilder().setSyncPublish(true).build(); + try + { + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + + producer.send(nextMessage(0, producerSession)); + + // try to send 2 messages (should block after 1) + sendMessagesAsync(producer, producerSession, 2); + + assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 2000)); + + Connection consumerConnection = getConnection(); + try + { + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumerConnection.start(); + + Message message = consumer.receive(getReceiveTimeout()); + assertNotNull("Message is not received", message); + + message = consumer.receive(getReceiveTimeout()); + assertNotNull("Second message is not received", message); + } + finally + { + consumerConnection.close(); + } + } + finally + { + producerConnection.close(); + } + } + + @Test + public void testQueueDeleteWithBlockedFlow() throws Exception + { + final String queueName = getTestName(); + final int messageSize = evaluateMessageSize(); + final int capacity = messageSize * 3 + messageSize / 2; + final int resumeCapacity = messageSize * 2; + + final Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, capacity, resumeCapacity); + + final Connection producerConnection = getConnectionBuilder().setSyncPublish(true).build(); + try + { + final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = producerSession.createProducer(queue); + + // try to send 5 messages (should block after 4) + final MessageSender sender = sendMessagesAsync(producer, producerSession, 5); + + assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000)); + + assertEquals("Incorrect number of message sent before blocking", 4, sender.getNumberOfSentMessages()); + + deleteEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue"); + + createQueue(queueName); + + assertEquals("Unexpected queue depth", 0, getQueueDepthBytes(queueName)); + } + finally + { + producerConnection.close(); + } + } + + private int getQueueDepthBytes(final String queueName) throws Exception + { + Map<String, Object> arguments = + Collections.singletonMap("statistics", Collections.singletonList("queueDepthBytes")); + Object statistics = performOperationUsingAmqpManagement(queueName, + "getStatistics", + "org.apache.qpid.Queue", + arguments); + assertNotNull("Statistics is null", statistics); + assertTrue("Statistics is not map", statistics instanceof Map); + @SuppressWarnings("unchecked") + Map<String, Object> statisticsMap = (Map<String, Object>) statistics; + assertTrue("queueDepthBytes is not present", statisticsMap.get("queueDepthBytes") instanceof Number); + return ((Number) statisticsMap.get("queueDepthBytes")).intValue(); + } + + private void setFlowLimits(final String queueName, final int blockValue, final int resumeValue) throws Exception + { + final Map<String, Object> attributes = new HashMap<>(); + attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, blockValue); + attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL.name()); + String resumeLimit = getFlowResumeLimit(blockValue, resumeValue); + String context = String.format("{\"%s\": %s}", + org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT, + resumeLimit); + attributes.put(org.apache.qpid.server.model.Queue.CONTEXT, context); + updateEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes); + } + + private String getFlowResumeLimit(final double maximumCapacity, final double resumeCapacity) + { + double ratio = resumeCapacity / maximumCapacity; + return String.format("%.2f", ratio * 100.0); + } + + private boolean isFlowStopped(final String queueName) throws Exception + { + Map<String, Object> attributes = readEntityUsingAmqpManagement("org.apache.qpid.Queue", queueName, false); + return Boolean.TRUE.equals(attributes.get("queueFlowStopped")); + } + + + private Queue createAndBindQueueWithFlowControlEnabled(String queueName, + int capacity, + int resumeCapacity) throws Exception + { + + final Map<String, Object> attributes = new HashMap<>(); + if (capacity != 0) + { + String flowResumeLimit = getFlowResumeLimit(capacity, resumeCapacity); + attributes.put(org.apache.qpid.server.model.Queue.CONTEXT, + String.format("{\"%s\": %s}", + org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT, + flowResumeLimit)); + } + attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, capacity); + attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL.name()); + createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes); + return createQueue(queueName); + } + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages) + { + return sendMessagesAsync(producer, producerSession, numMessages, null); + } + + private MessageSender sendMessagesAsync(final MessageProducer producer, + final Session producerSession, + final int numMessages, + final AtomicInteger messageCounter) + { + MessageSender sender = new MessageSender(producer, producerSession, numMessages, messageCounter); + new Thread(sender).start(); + return sender; + } + + private final byte[] BYTE_300 = new byte[300]; + + private Message nextMessage(int msg, Session producerSession) throws JMSException + { + BytesMessage send = producerSession.createBytesMessage(); + send.writeBytes(BYTE_300); + send.setIntProperty("msg", msg); + return send; + } + + private boolean awaitAttributeValue(String queueName, String attributeName, Object expectedValue, long timeout) + throws Exception + { + long startTime = System.currentTimeMillis(); + long endTime = startTime + timeout; + boolean found = false; + do + { + Map<String, Object> attributes = readEntityUsingAmqpManagement("org.apache.qpid.Queue", queueName, false); + Object actualValue = attributes.get(attributeName); + if (expectedValue == null) + { + found = actualValue == null; + } + else if (actualValue != null) + { + if (actualValue.getClass() == expectedValue.getClass()) + { + found = expectedValue.equals(actualValue); + } + else + { + found = String.valueOf(expectedValue).equals(String.valueOf(actualValue)); + } + } + + if (!found) + { + Thread.sleep(50); + } + } while (!found && System.currentTimeMillis() <= endTime); + return found; + } + + private int evaluateMessageSize() throws Exception + { + String tmpQueueName = getTestName() + "_Tmp"; + Queue tmpQueue = createQueue(tmpQueueName); + final Connection connection = getConnection(); + try + { + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer tmpQueueProducer = session.createProducer(tmpQueue); + tmpQueueProducer.send(nextMessage(0, session)); + session.commit(); + return getQueueDepthBytes(tmpQueueName); + } + finally + { + connection.close(); + } + } + + private class MessageSender implements Runnable + { + private final AtomicInteger _sentMessages; + private final MessageProducer _senderProducer; + private final Session _senderSession; + private final int _numMessages; + private volatile Exception _exception; + private CountDownLatch _sendLatch = new CountDownLatch(1); + + MessageSender(MessageProducer producer, Session producerSession, int numMessages, AtomicInteger messageCounter) + { + _senderProducer = producer; + _senderSession = producerSession; + _numMessages = numMessages; + _sentMessages = messageCounter == null ? new AtomicInteger(0) : messageCounter; + } + + @Override + public void run() + { + try + { + sendMessages(_senderProducer, _senderSession, _numMessages); + } + catch (Exception e) + { + _exception = e; + } + finally + { + _sendLatch.countDown(); + } + } + + CountDownLatch getSendLatch() + { + return _sendLatch; + } + + int getNumberOfSentMessages() + { + return _sentMessages.get(); + } + + Exception getException() + { + return _exception; + } + + private void sendMessages(MessageProducer producer, Session producerSession, int numMessages) + throws JMSException + { + + for (int msg = 0; msg < numMessages; msg++) + { + producer.send(nextMessage(msg, producerSession)); + _sentMessages.incrementAndGet(); + + // Cause work that causes a synchronous interaction on the wire. We need to be + // sure that the client has received the flow/message.stop etc. + producerSession.createTemporaryQueue().delete(); + } + } + } + + +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fbd973fd/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java deleted file mode 100644 index 062479c..0000000 --- a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ /dev/null @@ -1,566 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.server.logging.AbstractTestLogging; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.OverflowPolicy; -import org.apache.qpid.systest.rest.RestTestHelper; -import org.apache.qpid.test.utils.TestBrokerConfiguration; - -public class ProducerFlowControlTest extends AbstractTestLogging -{ - private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFlowControlTest.class); - - private Connection _producerConnection; - private Connection _consumerConnection; - private Session _producerSession; - private Session _consumerSession; - private MessageProducer _producer; - private MessageConsumer _consumer; - private Queue _queue; - private RestTestHelper _restTestHelper; - - private final AtomicInteger _sentMessages = new AtomicInteger(0); - private int _messageSizeIncludingHeader; - private Session _utilitySession; - - @Override - public void setUp() throws Exception - { - getDefaultBrokerConfiguration().addHttpManagementConfiguration(); - super.setUp(); - } - - @Override - public void startDefaultBroker() - { - // broker start-up is delegated to the tests - } - - private void init() throws Exception - { - super.startDefaultBroker(); - _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort()); - _monitor.markDiscardPoint(); - - if (!isBroker10()) - { - setSystemProperty("sync_publish", "all"); - } - - _producerConnection = getConnection(); - _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - _producerConnection.start(); - - _consumerConnection = getConnection(); - _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - final Connection utilityConnection = getConnection(); - utilityConnection.start(); - _utilitySession = utilityConnection.createSession(true, Session.SESSION_TRANSACTED); - String tmpQueueName = getTestQueueName() + "_Tmp"; - Queue tmpQueue = createTestQueue(_utilitySession, tmpQueueName); - MessageProducer tmpQueueProducer= _utilitySession.createProducer(tmpQueue); - tmpQueueProducer.send(nextMessage(0, _utilitySession)); - _utilitySession.commit(); - - _messageSizeIncludingHeader = getQueueDepthBytes(tmpQueueName); - } - - public void testCapacityExceededCausesBlock() throws Exception - { - init(); - String queueName = getTestQueueName(); - - int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2; - int resumeCapacity = _messageSizeIncludingHeader * 2; - createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity); - _producer = _producerSession.createProducer(_queue); - - // try to send 5 messages (should block after 4) - CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5).getSendLatch(); - - assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000)); - assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - Message message = _consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message is not received", message); - - assertFalse("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", false, 1000)); - - assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get()); - - Message message2 = _consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message is not received", message2); - assertTrue("Message sending is not finished", sendLatch.await(1000, TimeUnit.MILLISECONDS)); - assertEquals("Message not sent after two messages received", 5, _sentMessages.get()); - } - - - public void testBrokerLogMessages() throws Exception - { - init(); - String queueName = getTestQueueName(); - - int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2; - int resumeCapacity = _messageSizeIncludingHeader * 2; - - createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity); - _producer = _producerSession.createProducer(_queue); - - // try to send 5 messages (should block after 4) - sendMessagesAsync(_producer, _producerSession, 5); - - List<String> results = waitAndFindMatches("QUE-1003", 7000); - - assertEquals("Did not find correct number of QUE-1003 queue overfull messages", 1, results.size()); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - - while(_consumer.receive(1000) != null) {}; - - results = waitAndFindMatches("QUE-1004"); - - assertEquals("Did not find correct number of UNDERFULL queue underfull messages", 1, results.size()); - } - - public void testFlowControlOnCapacityResumeEqual() throws Exception - { - init(); - String queueName = getTestQueueName(); - - int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2; - createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, - capacity, - capacity); - _producer = _producerSession.createProducer(_queue); - - - // try to send 5 messages (should block after 4) - CountDownLatch sendLatch = sendMessagesAsync(_producer, _producerSession, 5).getSendLatch(); - - assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000)); - - assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - Message message = _consumer.receive(RECEIVE_TIMEOUT); - assertNotNull("Message is not received", message); - - assertTrue("Message sending is not finished", sendLatch.await(1000, TimeUnit.MILLISECONDS)); - - assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get()); - - - } - - - public void testFlowControlSoak() throws Exception - { - init(); - String queueName = getTestQueueName(); - - - final int numProducers = 10; - final int numMessages = 100; - - final int capacity = _messageSizeIncludingHeader * 20; - - createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, capacity/2); - - _consumerConnection.start(); - - Connection[] producers = new Connection[numProducers]; - for(int i = 0 ; i < numProducers; i ++) - { - - producers[i] = getConnection(); - producers[i].start(); - Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageProducer myproducer = session.createProducer(_queue); - MessageSender sender = sendMessagesAsync(myproducer, session, numMessages); - } - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - for(int j = 0; j < numProducers * numMessages; j++) - { - - Message msg = _consumer.receive(5000); - assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg); - - } - - - - Message msg = _consumer.receive(500); - assertNull("extra message received", msg); - - - for(int i = 0; i < numProducers; i++) - { - producers[i].close(); - } - - } - - public void testFlowControlAttributeModificationViaREST() throws Exception - { - init(); - String queueName = getTestQueueName(); - - createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 0, 0); - _producer = _producerSession.createProducer(_queue); - - String queueUrl = String.format("queue/%1$s/%1$s/%2$s", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName); - - //check current attribute values are 0 as expected - Map<String, Object> queueAttributes = _restTestHelper.getJsonAsMap(queueUrl); - assertEquals("Capacity was not the expected value", 0, - ((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES)).intValue()); - - //set new values that will cause flow control to be active, and the queue to become overfull after 1 message is sent - setFlowLimits(queueUrl, 250, 250); - assertFalse("Queue should not be overfull", isFlowStopped(queueUrl)); - - // try to send 2 messages (should block after 1) - sendMessagesAsync(_producer, _producerSession, 2); - - waitForFlowControlAndMessageCount(queueUrl, 1, 2000); - - //check only 1 message was sent, and queue is overfull - assertEquals("Incorrect number of message sent before blocking", 1, _sentMessages.get()); - assertTrue("Queue should be overfull", isFlowStopped(queueUrl)); - - int queueDepthBytes = getQueueDepthBytes(queueName); - //raise the attribute values, causing the queue to become underfull and allow the second message to be sent. - setFlowLimits(queueUrl, queueDepthBytes + 200, queueDepthBytes); - - waitForFlowControlAndMessageCount(queueUrl, 2, 2000); - - //check second message was sent, and caused the queue to become overfull again - assertEquals("Second message was not sent after lifting FlowResumeCapacity", 2, _sentMessages.get()); - assertTrue("Queue should be overfull", isFlowStopped(queueUrl)); - - //raise capacity above queue depth, check queue remains overfull as FlowResumeCapacity still exceeded - setFlowLimits(queueUrl, 2 * queueDepthBytes + 100, queueDepthBytes); - assertTrue("Queue should be overfull", isFlowStopped(queueUrl)); - - //receive a message, check queue becomes underfull - - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - assertNotNull("Should have received first message", _consumer.receive(RECEIVE_TIMEOUT)); - - if(!isBroker10()) - { - //perform a synchronous op on the connection - ((AMQSession<?, ?>) _consumerSession).sync(); - } - - _restTestHelper.waitForAttributeChanged(queueUrl, org.apache.qpid.server.model.Queue.QUEUE_FLOW_STOPPED, false); - - assertNotNull("Should have received second message", _consumer.receive(RECEIVE_TIMEOUT)); - } - - public void testProducerFlowControlIsTriggeredOnEnqueue() throws Exception - { - long oneHourMilliseconds = 60 * 60 * 1000L; - setSystemProperty("virtualhost.housekeepingCheckPeriod", String.valueOf(oneHourMilliseconds)); - super.startDefaultBroker(); - _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort()); - - Connection connection = getConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - String queueName = getTestQueueName(); - createAndBindQueueWithFlowControlEnabled(session, queueName, 1, 0, true, false); - - sendMessage(session, _queue, 1); - - String queueUrl = String.format("queue/%1$s/%1$s/%2$s", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName); - waitForFlowControlAndMessageCount(queueUrl, 1, 2000); - - assertTrue("Message flow is not stopped", isFlowStopped(queueUrl)); - } - - private int getQueueDepthBytes(final String queueName) throws IOException - { - // On AMQP 1.0 the size of the message on the broker is not necessarily the size of the message we sent. Therefore, get the actual size from the broker - final String requestUrl = String.format("queue/%1$s/%1$s/%2$s/getStatistics?statistics=[\"queueDepthBytes\"]", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName); - final Map<String, Object> queueAttributes = _restTestHelper.getJsonAsMap(requestUrl); - return ((Number) queueAttributes.get("queueDepthBytes")).intValue(); - } - - private void waitForFlowControlAndMessageCount(final String queueUrl, final int messageCount, final int timeout) throws InterruptedException, IOException - { - int timeWaited = 0; - while (timeWaited < timeout && (!isFlowStopped(queueUrl) || _sentMessages.get() != messageCount)) - { - Thread.sleep(50); - timeWaited += 50; - } - } - - private void setFlowLimits(final String queueUrl, final int blockValue, final int resumeValue) throws IOException - { - final Map<String, Object> attributes = new HashMap<>(); - attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, blockValue); - attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL); - String resumeLimit = getFlowResumeLimit(blockValue, resumeValue); - Map<String, String> context = Collections.singletonMap(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT, resumeLimit); - attributes.put(org.apache.qpid.server.model.Queue.CONTEXT, context); - _restTestHelper.submitRequest(queueUrl, "PUT", attributes); - } - - private String getFlowResumeLimit(final double blockValue, final double resumeValue) - { - return String.format("%.2f", resumeValue / blockValue * 100.0); - } - - private boolean isFlowStopped(final String queueUrl) throws IOException - { - Map<String, Object> queueAttributes2 = _restTestHelper.getJsonAsMap(queueUrl); - return (boolean) queueAttributes2.get(org.apache.qpid.server.model.Queue.QUEUE_FLOW_STOPPED); - } - - public void testQueueDeleteWithBlockedFlow() throws Exception - { - init(); - String queueName = getTestQueueName(); - int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2; - int resumeCapacity = _messageSizeIncludingHeader * 2; - createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, capacity, resumeCapacity, true, false); - - _producer = _producerSession.createProducer(_queue); - - // try to send 5 messages (should block after 4) - sendMessagesAsync(_producer, _producerSession, 5); - - assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true,5000)); - - assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get()); - - if(!isBroker10()) - { - // delete queue with a consumer session - ((AMQSession<?, ?>) _utilitySession).sendQueueDelete(queueName); - } - else - { - deleteEntityUsingAmqpManagement(getTestQueueName(), _utilitySession, "org.apache.qpid.Queue"); - createTestQueue(_utilitySession); - } - _consumer = _consumerSession.createConsumer(_queue); - _consumerConnection.start(); - - Message message = _consumer.receive(1000l); - assertNull("Unexpected message", message); - } - - private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity) throws Exception - { - createAndBindQueueWithFlowControlEnabled(session, queueName, capacity, resumeCapacity, false, true); - } - - private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception - { - if(isBroker10()) - { - final Map<String, Object> attributes = new HashMap<>(); - if (capacity != 0) - { - attributes.put(org.apache.qpid.server.model.Queue.CONTEXT, - Collections.singletonMap(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT, - getFlowResumeLimit(capacity, resumeCapacity))); - } - attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, capacity); - attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL); - attributes.put(org.apache.qpid.server.model.Queue.DURABLE, durable); - attributes.put(ConfiguredObject.LIFETIME_POLICY, autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name() : LifetimePolicy.PERMANENT.name()); - String queueUrl = String.format("queue/%1$s/%1$s/%2$s", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName); - _restTestHelper.submitRequest(queueUrl, "PUT", attributes, 201); - _queue = session.createQueue(queueName); - } - else - { - final Map<String, Object> arguments = new HashMap<String, Object>(); - arguments.put("x-qpid-capacity", capacity); - arguments.put("x-qpid-flow-resume-capacity", resumeCapacity); - ((AMQSession<?, ?>) session).createQueue(queueName, autoDelete, durable, false, arguments); - _queue = session.createQueue("direct://amq.direct/" - + queueName - + "/" - + queueName - + "?durable='" - + durable - + "'&autodelete='" - + autoDelete - + "'"); - ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) _queue); - } - } - - private MessageSender sendMessagesAsync(final MessageProducer producer, - final Session producerSession, - final int numMessages) - { - MessageSender sender = new MessageSender(producer, producerSession, numMessages); - new Thread(sender).start(); - return sender; - } - - - private class MessageSender implements Runnable - { - private final MessageProducer _senderProducer; - private final Session _senderSession; - private final int _numMessages; - private volatile JMSException _exception; - private CountDownLatch _sendLatch = new CountDownLatch(1); - - public MessageSender(MessageProducer producer, Session producerSession, int numMessages) - { - _senderProducer = producer; - _senderSession = producerSession; - _numMessages = numMessages; - } - - @Override - public void run() - { - try - { - sendMessages(_senderProducer, _senderSession, _numMessages); - } - catch (JMSException e) - { - _exception = e; - } - finally - { - _sendLatch.countDown(); - } - } - - public CountDownLatch getSendLatch() - { - return _sendLatch; - } - - private void sendMessages(MessageProducer producer, Session producerSession, int numMessages) - throws JMSException - { - - for (int msg = 0; msg < numMessages; msg++) - { - producer.send(nextMessage(msg, producerSession)); - _sentMessages.incrementAndGet(); - - // Cause work that causes a synchronous interaction on the wire. We need to be - // sure that the client has received the flow/message.stop etc. - producerSession.createTemporaryQueue().delete(); - } - } - - } - - private final byte[] BYTE_300 = new byte[300]; - - private Message nextMessage(int msg, Session producerSession) throws JMSException - { - BytesMessage send = producerSession.createBytesMessage(); - send.writeBytes(BYTE_300); - send.setIntProperty("msg", msg); - return send; - } - - private boolean awaitAttributeValue(String queueName, String attributeName, Object expectedValue, long timeout) - throws JMSException, InterruptedException - { - long startTime = System.currentTimeMillis(); - long endTime = startTime + timeout; - boolean found = false; - do - { - Map<String, Object> attributes = - managementReadObject(_utilitySession, "org.apache.qpid.SortedQueue", queueName, false); - Object actualValue = attributes.get(attributeName); - if (expectedValue == null) - { - found = actualValue == null; - } - else if (actualValue != null) - { - if (actualValue.getClass() == expectedValue.getClass()) - { - found = expectedValue.equals(actualValue); - } - else - { - found = String.valueOf(expectedValue).equals(String.valueOf(actualValue)); - } - } - - if (!found) - { - Thread.sleep(50); - } - } while (!found && System.currentTimeMillis() <= endTime); - return found; - } -} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/fbd973fd/test-profiles/CPPExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes index 1842076..c1a59a9 100755 --- a/test-profiles/CPPExcludes +++ b/test-profiles/CPPExcludes @@ -79,7 +79,6 @@ org.apache.qpid.server.queue.LiveQueueOperationsTest#* org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError //QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Qpid Broker-J (not in CPP Broker) -org.apache.qpid.server.queue.ProducerFlowControlTest#* org.apache.qpid.test.client.ProducerFlowControlTest#* //QPID-3986 : Flow control invoked on total store disk usage --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
