Repository: qpid-broker-j Updated Branches: refs/heads/master e3740879d -> 8d9ba1c47
QPID-7808: [Java Broker] [AMQP 0-10] Ensure IO thread calls ServerSession#receivedComplete() with the session's principal Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8d9ba1c4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8d9ba1c4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8d9ba1c4 Branch: refs/heads/master Commit: 8d9ba1c4784deec4018da20cfe0362d649c80ad2 Parents: e374087 Author: Alex Rudyy <oru...@apache.org> Authored: Fri Jun 2 15:12:35 2017 +0100 Committer: Alex Rudyy <oru...@apache.org> Committed: Fri Jun 2 15:12:35 2017 +0100 ---------------------------------------------------------------------- .../server/protocol/v0_10/ServerSession.java | 16 +++--- .../server/queue/ProducerFlowControlTest.java | 53 ++++++++++++-------- 2 files changed, 41 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d9ba1c4/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 02b6a61..cd27cbc 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -1605,14 +1605,18 @@ public class ServerSession extends SessionInvoker } - public void receivedComplete() + void receivedComplete() { - final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); - for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) + runAsSubject(() -> { - subscription_0_10.flushCreditState(false); - } - awaitCommandCompletion(); + final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); + for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) + { + subscription_0_10.flushCreditState(false); + } + awaitCommandCompletion(); + return null; + }); } public int getUnacknowledgedMessageCount() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d9ba1c4/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java index ca75e35..6b9cee7 100644 --- a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java +++ b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java @@ -74,6 +74,10 @@ public class ProducerFlowControlTest extends AbstractTestLogging super.setUp(); _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort()); + } + + private void init() throws Exception + { _monitor.markDiscardPoint(); if (!isBroker10()) @@ -94,36 +98,16 @@ public class ProducerFlowControlTest extends AbstractTestLogging _utilitySession = utilityConnection.createSession(true, Session.SESSION_TRANSACTED); String tmpQueueName = getTestQueueName() + "_Tmp"; Queue tmpQueue = createTestQueue(_utilitySession, tmpQueueName); - MessageProducer tmpQueueProducer= _utilitySession.createProducer(tmpQueue); + MessageProducer tmpQueueProducer= _utilitySession.createProducer(tmpQueue); tmpQueueProducer.send(nextMessage(0, _utilitySession)); _utilitySession.commit(); _messageSizeIncludingHeader = getQueueDepthBytes(tmpQueueName); } - @Override - public void tearDown() throws Exception - { - try - { - try - { - _producerConnection.close(); - _consumerConnection.close(); - } - finally - { - _restTestHelper.tearDown(); - } - } - finally - { - super.tearDown(); - } - } - public void testCapacityExceededCausesBlock() throws Exception { + init(); String queueName = getTestQueueName(); int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2; @@ -156,6 +140,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testBrokerLogMessages() throws Exception { + init(); String queueName = getTestQueueName(); int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2; @@ -184,6 +169,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testFlowControlOnCapacityResumeEqual() throws Exception { + init(); String queueName = getTestQueueName(); int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2; @@ -216,6 +202,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testFlowControlSoak() throws Exception { + init(); String queueName = getTestQueueName(); @@ -266,6 +253,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testFlowControlAttributeModificationViaREST() throws Exception { + init(); String queueName = getTestQueueName(); createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 0, 0); @@ -323,6 +311,26 @@ public class ProducerFlowControlTest extends AbstractTestLogging assertNotNull("Should have received second message", _consumer.receive(RECEIVE_TIMEOUT)); } + public void testProducerFlowControlIsTriggeredOnEnqueueAsPartOfAsyncTransaction() throws Exception + { + long oneHourMilliseconds = 60 * 60 * 1000L; + setSystemProperty("virtualhost.housekeepingCheckPeriod", String.valueOf(oneHourMilliseconds)); + + restartDefaultBroker(); + Connection connection = getConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String queueName = getTestQueueName(); + createAndBindQueueWithFlowControlEnabled(session, queueName, 1, 0, true, false); + + sendMessage(session, _queue, 1); + + String queueUrl = String.format("queue/%1$s/%1$s/%2$s", TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName); + waitForFlowControlAndMessageCount(queueUrl, 1, 2000); + + assertTrue("Message flow is not stopped", isFlowStopped(queueUrl)); + } + private int getQueueDepthBytes(final String queueName) throws IOException { // On AMQP 1.0 the size of the message on the broker is not necessarily the size of the message we sent. Therefore, get the actual size from the broker @@ -365,6 +373,7 @@ public class ProducerFlowControlTest extends AbstractTestLogging public void testQueueDeleteWithBlockedFlow() throws Exception { + init(); String queueName = getTestQueueName(); int capacity = _messageSizeIncludingHeader * 3 + _messageSizeIncludingHeader / 2; int resumeCapacity = _messageSizeIncludingHeader * 2; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org