Author: kwall
Date: Mon May 9 09:20:06 2016
New Revision: 1742900
URL: http://svn.apache.org/viewvc?rev=1742900&view=rev
Log:
QPID-7237: [Java Client] Use single thread thread-pool to perform flow control
on no-ack sessions
* Avoids spawning new thread for each state change
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1742900&r1=1742899&r2=1742900&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon May 9 09:20:06 2016
@@ -28,6 +28,10 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -293,6 +297,8 @@ public abstract class AMQSession<C exten
return _messageFactoryRegistry;
}
+ private final ExecutorService _flowControlNoAckTaskPool;
+
/**
* Consumers associated with this session
*/
@@ -360,73 +366,77 @@ public abstract class AMQSession<C exten
if (_acknowledgeMode == NO_ACKNOWLEDGE)
{
- _queue =
- new
FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, _prefetchLowMark,
- new
FlowControllingBlockingQueue.ThresholdListener()
- {
- private final
AtomicBoolean _suspendState = new AtomicBoolean();
-
- public void
aboveThreshold(int currentValue)
- {
- // If the session
has been closed don't waste time creating a thread to do
- // flow control
- if
(!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
- {
- // Only
execute change if previous state
- // was False
- if
(!_suspendState.getAndSet(true))
- {
- if
(_logger.isDebugEnabled())
- {
-
_logger.debug(
-
"Above threshold(" + _prefetchHighMark
-
+ ") so suspending channel. Current value is " + currentValue);
- }
- try
- {
-
Threading.getThreadFactory().createThread(new
SuspenderRunner(_suspendState)).start();
- }
- catch
(Exception e)
- {
- throw
new RuntimeException("Failed to create thread", e);
- }
- }
- }
- }
-
- public void
underThreshold(int currentValue)
- {
- // If the session
has been closed don't waste time creating a thread to do
- // flow control
- if
(!(AMQSession.this.isClosed() || AMQSession.this.isClosing()))
- {
- // Only
execute change if previous state
- // was true
- if
(_suspendState.getAndSet(false))
- {
- if
(_logger.isDebugEnabled())
- {
-
-
_logger.debug(
-
"Below threshold(" + _prefetchLowMark
-
+ ") so unsuspending channel. Current value is " + currentValue);
- }
- try
- {
-
Threading.getThreadFactory().createThread(new
SuspenderRunner(_suspendState)).start();
- }
- catch
(Exception e)
- {
- throw
new RuntimeException("Failed to create thread", e);
- }
- }
- }
- }
- });
+ _flowControlNoAckTaskPool = Executors.newSingleThreadExecutor(new
ThreadFactory()
+ {
+ @Override
+ public Thread newThread(final Runnable r)
+ {
+ Thread thread = new Thread(r, "Connection_" +
_connection.getConnectionNumber() + "_session_" + _channelId);
+ if (!thread.isDaemon())
+ {
+ thread.setDaemon(true);
+ }
+
+ return thread;
+ }
+ });
+
+ final FlowControllingBlockingQueue.ThresholdListener listener =
+ new FlowControllingBlockingQueue.ThresholdListener()
+ {
+ private final AtomicBoolean _suspendState = new
AtomicBoolean();
+
+ public void aboveThreshold(int currentValue)
+ {
+ if (!(AMQSession.this.isClosed() ||
AMQSession.this.isClosing()))
+ {
+ // Only execute change if previous state was
false
+ if (!_suspendState.getAndSet(true))
+ {
+ _logger.debug(
+ "Above threshold ({}) so
suspending channel. Current value is {}",
+ _prefetchHighMark,
+ currentValue);
+
+ doSuspend();
+ }
+ }
+ }
+
+ public void underThreshold(int currentValue)
+ {
+ if (!(AMQSession.this.isClosed() ||
AMQSession.this.isClosing()))
+ {
+ // Only execute change if previous state was
true
+ if (_suspendState.getAndSet(false))
+ {
+ _logger.debug(
+ "Below threshold ({}) so
unsuspending channel. Current value is {}",
+ _prefetchLowMark,
+ currentValue);
+ doSuspend();
+ }
+ }
+ }
+
+ private void doSuspend()
+ {
+ try
+ {
+ _flowControlNoAckTaskPool.execute(new
SuspenderRunner(_suspendState));
+ }
+ catch (RejectedExecutionException e)
+ {
+ // Ignore - session must be closing/closed.
+ }
+ }
+ };
+ _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark,
_prefetchLowMark, listener);
}
else
{
- _queue = new
FlowControllingBlockingQueue<Dispatchable>(_prefetchHighMark, null);
+ _flowControlNoAckTaskPool = null;
+ _queue = new FlowControllingBlockingQueue<>(_prefetchHighMark,
null);
}
// Add creation logging to tie in with the existing close logging
@@ -775,6 +785,7 @@ public abstract class AMQSession<C exten
}
finally
{
+ shutdownFlowControlNoAckTaskPool();
_connection.deregisterSession(_channelId);
}
}
@@ -794,7 +805,7 @@ public abstract class AMQSession<C exten
// with a null cause
// When we are closing the Session due to a protocol session error we
simply create a new AMQException
// with the correct error code and text this is cleary WRONG as the
instanceof check below will fail.
- // We need to determin here if the connection should be
+ // We need to determine here if the connection should be
if (e instanceof AMQDisconnectedException)
{
@@ -822,6 +833,7 @@ public abstract class AMQSession<C exten
_connection.deregisterSession(_channelId);
closeProducersAndConsumers(amqe);
+ shutdownFlowControlNoAckTaskPool();
}
}
@@ -3191,7 +3203,7 @@ public abstract class AMQSession<C exten
* @throws QpidException If the session cannot be suspended for any reason.
* TODO Be aware of possible changes to parameter order as versions
change.
*/
- protected void suspendChannel(boolean suspend) throws QpidException // ,
FailoverException
+ protected void suspendChannel(boolean suspend) throws QpidException
{
synchronized (_suspensionLock)
{
@@ -3648,7 +3660,7 @@ public abstract class AMQSession<C exten
synchronized (_suspensionLock)
{
// If the session has closed by the time we get here
- // then we should not attempt to write to the
sesion/channel.
+ // then we should not attempt to write to the
session/channel.
if (!(AMQSession.this.isClosed() ||
AMQSession.this.isClosing()))
{
suspendChannel(_suspend.get());
@@ -3797,5 +3809,14 @@ public abstract class AMQSession<C exten
{
_queue.clear();
}
+
+ private void shutdownFlowControlNoAckTaskPool()
+ {
+ if (_flowControlNoAckTaskPool != null)
+ {
+ _flowControlNoAckTaskPool.shutdown();
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]