Repository: qpid-broker-j
Updated Branches:
  refs/heads/master e3740879d -> 8d9ba1c47


QPID-7808: [Java Broker] [AMQP 0-10] Ensure IO thread calls 
ServerSession#receivedComplete() with the session's principal


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8d9ba1c4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8d9ba1c4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8d9ba1c4

Branch: refs/heads/master
Commit: 8d9ba1c4784deec4018da20cfe0362d649c80ad2
Parents: e374087
Author: Alex Rudyy <oru...@apache.org>
Authored: Fri Jun 2 15:12:35 2017 +0100
Committer: Alex Rudyy <oru...@apache.org>
Committed: Fri Jun 2 15:12:35 2017 +0100

----------------------------------------------------------------------
 .../server/protocol/v0_10/ServerSession.java    | 16 +++---
 .../server/queue/ProducerFlowControlTest.java   | 53 ++++++++++++--------
 2 files changed, 41 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d9ba1c4/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 02b6a61..cd27cbc 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1605,14 +1605,18 @@ public class ServerSession extends SessionInvoker
     }
 
 
-    public void receivedComplete()
+    void receivedComplete()
     {
-        final Collection<ConsumerTarget_0_10> subscriptions = 
getSubscriptions();
-        for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
+        runAsSubject(() ->
         {
-            subscription_0_10.flushCreditState(false);
-        }
-        awaitCommandCompletion();
+            final Collection<ConsumerTarget_0_10> subscriptions = 
getSubscriptions();
+            for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
+            {
+                subscription_0_10.flushCreditState(false);
+            }
+            awaitCommandCompletion();
+            return null;
+        });
     }
 
     public int getUnacknowledgedMessageCount()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8d9ba1c4/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git 
a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
 
b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
index ca75e35..6b9cee7 100644
--- 
a/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
+++ 
b/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
@@ -74,6 +74,10 @@ public class ProducerFlowControlTest extends 
AbstractTestLogging
         super.setUp();
 
         _restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
+    }
+
+    private void init() throws Exception
+    {
         _monitor.markDiscardPoint();
 
         if (!isBroker10())
@@ -94,36 +98,16 @@ public class ProducerFlowControlTest extends 
AbstractTestLogging
         _utilitySession = utilityConnection.createSession(true, 
Session.SESSION_TRANSACTED);
         String tmpQueueName = getTestQueueName() + "_Tmp";
         Queue tmpQueue = createTestQueue(_utilitySession, tmpQueueName);
-        MessageProducer  tmpQueueProducer= 
_utilitySession.createProducer(tmpQueue);
+        MessageProducer tmpQueueProducer= 
_utilitySession.createProducer(tmpQueue);
         tmpQueueProducer.send(nextMessage(0, _utilitySession));
         _utilitySession.commit();
 
         _messageSizeIncludingHeader = getQueueDepthBytes(tmpQueueName);
     }
 
-    @Override
-    public void tearDown() throws Exception
-    {
-        try
-        {
-            try
-            {
-                _producerConnection.close();
-                _consumerConnection.close();
-            }
-            finally
-            {
-                _restTestHelper.tearDown();
-            }
-        }
-        finally
-        {
-            super.tearDown();
-        }
-    }
-
     public void testCapacityExceededCausesBlock() throws Exception
     {
+        init();
         String queueName = getTestQueueName();
 
         int capacity = _messageSizeIncludingHeader * 3 + 
_messageSizeIncludingHeader / 2;
@@ -156,6 +140,7 @@ public class ProducerFlowControlTest extends 
AbstractTestLogging
 
     public void testBrokerLogMessages() throws Exception
     {
+        init();
         String queueName = getTestQueueName();
 
         int capacity = _messageSizeIncludingHeader * 3 + 
_messageSizeIncludingHeader / 2;
@@ -184,6 +169,7 @@ public class ProducerFlowControlTest extends 
AbstractTestLogging
 
     public void testFlowControlOnCapacityResumeEqual() throws Exception
     {
+        init();
         String queueName = getTestQueueName();
 
         int capacity = _messageSizeIncludingHeader * 3 + 
_messageSizeIncludingHeader / 2;
@@ -216,6 +202,7 @@ public class ProducerFlowControlTest extends 
AbstractTestLogging
 
     public void testFlowControlSoak() throws Exception
     {
+        init();
         String queueName = getTestQueueName();
         
 
@@ -266,6 +253,7 @@ public class ProducerFlowControlTest extends 
AbstractTestLogging
 
     public void testFlowControlAttributeModificationViaREST() throws Exception
     {
+        init();
         String queueName = getTestQueueName();
 
         createAndBindQueueWithFlowControlEnabled(_producerSession, queueName, 
0, 0);
@@ -323,6 +311,26 @@ public class ProducerFlowControlTest extends 
AbstractTestLogging
         assertNotNull("Should have received second message", 
_consumer.receive(RECEIVE_TIMEOUT));
     }
 
+    public void 
testProducerFlowControlIsTriggeredOnEnqueueAsPartOfAsyncTransaction() throws 
Exception
+    {
+        long oneHourMilliseconds = 60 * 60 * 1000L;
+        setSystemProperty("virtualhost.housekeepingCheckPeriod", 
String.valueOf(oneHourMilliseconds));
+
+        restartDefaultBroker();
+        Connection connection = getConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        String queueName = getTestQueueName();
+        createAndBindQueueWithFlowControlEnabled(session, queueName, 1, 0, 
true, false);
+
+        sendMessage(session, _queue, 1);
+
+        String queueUrl = String.format("queue/%1$s/%1$s/%2$s", 
TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, queueName);
+        waitForFlowControlAndMessageCount(queueUrl, 1, 2000);
+
+        assertTrue("Message flow is not stopped", isFlowStopped(queueUrl));
+    }
+
     private int getQueueDepthBytes(final String queueName) throws IOException
     {
         // On AMQP 1.0 the size of the message on the broker is not 
necessarily the size of the message we sent. Therefore, get the actual size 
from the broker
@@ -365,6 +373,7 @@ public class ProducerFlowControlTest extends 
AbstractTestLogging
 
     public void testQueueDeleteWithBlockedFlow() throws Exception
     {
+        init();
         String queueName = getTestQueueName();
         int capacity = _messageSizeIncludingHeader * 3 + 
_messageSizeIncludingHeader / 2;
         int resumeCapacity = _messageSizeIncludingHeader * 2;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to