Author: orudyy
Date: Wed May 11 14:39:28 2016
New Revision: 1743383

URL: http://svn.apache.org/viewvc?rev=1743383&view=rev
Log:
QPID-7237: Coalesce flow control tasks for no-ack sessions and change the lower 
prefetch threshold to be half of upper prefetch threshold when the same values 
for thresholds are specified

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1743383&r1=1743382&r2=1743383&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java 
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java 
Wed May 11 14:39:28 2016
@@ -29,9 +29,9 @@ import java.util.concurrent.ConcurrentLi
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -161,10 +161,10 @@ public abstract class AMQSession<C exten
     private int _ticket;
 
     /** Holds the high mark for prefetched message, at which the session is 
suspended. */
-    private int _prefetchHighMark;
+    private final int _prefetchHighMark;
 
     /** Holds the low mark for prefetched messages, below which the session is 
resumed. */
-    private int _prefetchLowMark;
+    private final int _prefetchLowMark;
 
     /** Holds the message listener, if any, which is attached to this session. 
*/
     private MessageListener _messageListener = null;
@@ -364,12 +364,20 @@ public abstract class AMQSession<C exten
         _messageEncryptionHelper = new MessageEncryptionHelper(this);
         _channelId = channelId;
         _messageFactoryRegistry = 
MessageFactoryRegistry.newDefaultRegistry(this);
-        _prefetchHighMark = defaultPrefetchHighMark;
-        _prefetchLowMark = defaultPrefetchLowMark;
 
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
-            _flowControlNoAckTaskPool = Executors.newSingleThreadExecutor(new 
ThreadFactory()
+            _prefetchHighMark = defaultPrefetchHighMark;
+            _prefetchLowMark = defaultPrefetchLowMark == 
defaultPrefetchHighMark && defaultPrefetchHighMark > 0
+                   ? Math.max(defaultPrefetchHighMark / 2, 1)
+                    : defaultPrefetchLowMark;
+
+            // we coalesce suspend jobs using single threaded pool executor 
with queue length of one
+            // and discarding policy
+            _flowControlNoAckTaskPool = new ThreadPoolExecutor(1, 1,
+                                                               0L, 
TimeUnit.MILLISECONDS,
+                                                               new 
LinkedBlockingQueue<Runnable>(1),
+                                                               new 
ThreadFactory()
             {
                 @Override
                 public Thread newThread(final Runnable r)
@@ -382,7 +390,7 @@ public abstract class AMQSession<C exten
 
                     return thread;
                 }
-            });
+            }, new ThreadPoolExecutor.DiscardPolicy());
 
             final FlowControllingBlockingQueue.ThresholdListener listener =
                     new FlowControllingBlockingQueue.ThresholdListener()
@@ -424,20 +432,15 @@ public abstract class AMQSession<C exten
 
                         private void doSuspend()
                         {
-                            try
-                            {
-                                _flowControlNoAckTaskPool.execute(new 
SuspenderRunner(_suspendState));
-                            }
-                            catch (RejectedExecutionException e)
-                            {
-                                // Ignore - session must be closing/closed.
-                            }
+                            _flowControlNoAckTaskPool.execute(new 
SuspenderRunner(_suspendState));
                         }
                     };
             _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, 
_prefetchLowMark, listener);
         }
         else
         {
+            _prefetchHighMark = defaultPrefetchHighMark;
+            _prefetchLowMark = defaultPrefetchLowMark;
             _flowControlNoAckTaskPool = null;
             _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark, 
null);
         }

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1743383&r1=1743382&r2=1743383&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
 Wed May 11 14:39:28 2016
@@ -88,9 +88,21 @@ public class FlowControllingBlockingQueu
         _flowControlHighThreshold = highThreshold;
         _flowControlLowThreshold = lowThreshold;
         _listener = listener;
-        if (highThreshold == 0)
+        if (highThreshold <= 0)
         {
-               disableFlowControl = true;
+            disableFlowControl = true;
+        }
+        else if (lowThreshold > highThreshold)
+        {
+            throw new IllegalArgumentException(String.format(
+                    "Invalid low threshold %d : it should be less or equal 
high threshold %d",
+                    lowThreshold,
+                    highThreshold));
+        }
+        else if (lowThreshold < 1)
+        {
+            throw new IllegalArgumentException(String.format("Invalid low 
threshold %d: it should be greater than 0",
+                                                             lowThreshold));
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to