Author: kwall
Date: Thu Mar 26 23:44:39 2015
New Revision: 1669472
URL: http://svn.apache.org/r1669472
Log:
QPID-6466: [Java Client] Avoid possibilty that the dispatcher and IO thread can
reject/release the same message during consumer close
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1669472&r1=1669471&r2=1669472&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Mar 26 23:44:39 2015
@@ -890,16 +890,17 @@ public abstract class AMQSession<C exten
}
catch(TransportException e)
{
- throw toJMSException("Session exception occured while trying to
commit: " + e.getMessage(), e);
+ throw toJMSException("Session exception occurred while trying to
commit: " + e.getMessage(), e);
}
}
protected abstract void commitImpl() throws AMQException,
FailoverException, TransportException;
+
+
+
public void confirmConsumerCancelled(int consumerTag)
{
-
- // Remove the consumer from the map
C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
@@ -917,7 +918,7 @@ public abstract class AMQSession<C exten
startDispatcherIfNecessary(true);
}
- _dispatcher.rejectPending(consumer);
+ rejectPending(consumer);
}
else // Queue Browser
{
@@ -947,6 +948,18 @@ public abstract class AMQSession<C exten
}
}
+ private void rejectPending(C consumer)
+ {
+ // Reject messages on pre-receive queue
+ consumer.rollbackPendingMessages();
+
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag());
+
+ // closeConsumer
+ consumer.markClosed();
+ }
+
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
if (isStrictAMQP())
@@ -3077,19 +3090,12 @@ public abstract class AMQSession<C exten
_producers.put(producerId, producer);
}
- /**
- * @param consumerTag The consumerTag to prune from queue or all if null
- * @param requeue Should the removed messages be requeued (or
discarded. Possibly to DLQ)
- * @param rejectAllConsumers
- */
-
- private void rejectMessagesForConsumerTag(int consumerTag, boolean
requeue, boolean rejectAllConsumers)
+ private void rejectMessagesForConsumerTag(int consumerTag)
{
Iterator<Dispatchable> messages = _queue.iterator();
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting messages from _queue for Consumer tag(" +
consumerTag + ") (PDispatchQ) requeue:"
- + requeue);
+ _logger.debug("Rejecting messages from _queue for Consumer tag(" +
consumerTag + ")");
if (messages.hasNext())
{
@@ -3104,21 +3110,23 @@ public abstract class AMQSession<C exten
{
UnprocessedMessage message = (UnprocessedMessage) messages.next();
- if (rejectAllConsumers || (message.getConsumerTag() ==
consumerTag))
+ if (message.getConsumerTag() == consumerTag)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Removing message(" +
System.identityHashCode(message) + ") from _queue DT:"
- + message.getDeliveryTag());
- }
- messages.remove();
+ if (_queue.remove(message))
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Removing message(" +
System.identityHashCode(message) + ") from _queue DT:"
+ + message.getDeliveryTag());
+ }
- rejectMessage(message, requeue);
+ rejectMessage(message, true);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Rejected the message(" + message.toString()
+ ") for consumer :" + consumerTag);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejected the message(" +
message.toString() + ") for consumer :" + consumerTag);
+ }
}
}
}
@@ -3288,18 +3296,6 @@ public abstract class AMQSession<C exten
return _closed;
}
- public void rejectPending(C consumer)
- {
- // Reject messages on pre-receive queue
- consumer.rollbackPendingMessages();
-
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true,
false);
-
- // closeConsumer
- consumer.markClosed();
-
- }
public void rollback()
{
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1669472&r1=1669471&r2=1669472&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
Thu Mar 26 23:44:39 2015
@@ -59,6 +59,7 @@ public class FlowControllingBlockingQueu
return _queue.isEmpty();
}
+
public interface ThresholdListener
{
void aboveThreshold(int currentValue);
@@ -104,14 +105,7 @@ public class FlowControllingBlockingQueu
if (o != null && !disableFlowControl && _listener != null)
{
- synchronized (_listener)
- {
- if (_count-- == _flowControlLowThreshold)
- {
- _listener.underThreshold(_count);
- }
- }
-
+ reportBelowIfNecessary();
}
return o;
@@ -132,14 +126,7 @@ public class FlowControllingBlockingQueu
}
if (!disableFlowControl && _listener != null)
{
- synchronized (_listener)
- {
- if (_count-- == _flowControlLowThreshold)
- {
- _listener.underThreshold(_count);
- }
- }
-
+ reportBelowIfNecessary();
}
return o;
@@ -155,18 +142,44 @@ public class FlowControllingBlockingQueu
}
if (!disableFlowControl && _listener != null)
{
- synchronized (_listener)
- {
- if (++_count == _flowControlHighThreshold)
- {
- _listener.aboveThreshold(_count);
- }
- }
+ reportAboveIfNecessary();
}
}
+ public boolean remove(final T o)
+ {
+ final boolean removed = _queue.remove(o);
+ if (removed && !disableFlowControl && _listener != null)
+ {
+ reportBelowIfNecessary();
+ }
+ return removed;
+ }
+
public Iterator<T> iterator()
{
return _queue.iterator();
}
+
+ private void reportAboveIfNecessary()
+ {
+ synchronized (_listener)
+ {
+ if (++_count == _flowControlHighThreshold)
+ {
+ _listener.aboveThreshold(_count);
+ }
+ }
+ }
+
+ private void reportBelowIfNecessary()
+ {
+ synchronized (_listener)
+ {
+ if (_count-- == _flowControlLowThreshold)
+ {
+ _listener.underThreshold(_count);
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]