Author: kwall
Date: Thu Mar 26 23:44:39 2015
New Revision: 1669472

URL: http://svn.apache.org/r1669472
Log:
QPID-6466: [Java Client] Avoid possibilty that the dispatcher and IO thread can 
reject/release the same message during consumer close

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1669472&r1=1669471&r2=1669472&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Mar 26 23:44:39 2015
@@ -890,16 +890,17 @@ public abstract class AMQSession<C exten
         }
         catch(TransportException e)
         {
-            throw toJMSException("Session exception occured while trying to 
commit: " + e.getMessage(), e);
+            throw toJMSException("Session exception occurred while trying to 
commit: " + e.getMessage(), e);
         }
     }
 
     protected abstract void commitImpl() throws AMQException, 
FailoverException, TransportException;
 
+
+
+
     public void confirmConsumerCancelled(int consumerTag)
     {
-
-        // Remove the consumer from the map
         C consumer = _consumers.get(consumerTag);
         if (consumer != null)
         {
@@ -917,7 +918,7 @@ public abstract class AMQSession<C exten
                     startDispatcherIfNecessary(true);
                 }
 
-                _dispatcher.rejectPending(consumer);
+                rejectPending(consumer);
             }
             else // Queue Browser
             {
@@ -947,6 +948,18 @@ public abstract class AMQSession<C exten
         }
     }
 
+    private void rejectPending(C consumer)
+    {
+        // Reject messages on pre-receive queue
+        consumer.rollbackPendingMessages();
+
+        // Reject messages on pre-dispatch queue
+        rejectMessagesForConsumerTag(consumer.getConsumerTag());
+
+        // closeConsumer
+        consumer.markClosed();
+    }
+
     public QueueBrowser createBrowser(Queue queue) throws JMSException
     {
         if (isStrictAMQP())
@@ -3077,19 +3090,12 @@ public abstract class AMQSession<C exten
         _producers.put(producerId, producer);
     }
 
-    /**
-     * @param consumerTag The consumerTag to prune from queue or all if null
-     * @param requeue     Should the removed messages be requeued (or 
discarded. Possibly to DLQ)
-     * @param rejectAllConsumers
-     */
-
-    private void rejectMessagesForConsumerTag(int consumerTag, boolean 
requeue, boolean rejectAllConsumers)
+    private void rejectMessagesForConsumerTag(int consumerTag)
     {
         Iterator<Dispatchable> messages = _queue.iterator();
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Rejecting messages from _queue for Consumer tag(" + 
consumerTag + ") (PDispatchQ) requeue:"
-                         + requeue);
+            _logger.debug("Rejecting messages from _queue for Consumer tag(" + 
consumerTag + ")");
 
             if (messages.hasNext())
             {
@@ -3104,21 +3110,23 @@ public abstract class AMQSession<C exten
         {
             UnprocessedMessage message = (UnprocessedMessage) messages.next();
 
-            if (rejectAllConsumers || (message.getConsumerTag() == 
consumerTag))
+            if (message.getConsumerTag() == consumerTag)
             {
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("Removing message(" + 
System.identityHashCode(message) + ") from _queue DT:"
-                                  + message.getDeliveryTag());
-                }
 
-                messages.remove();
+                if (_queue.remove(message))
+                {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Removing message(" + 
System.identityHashCode(message) + ") from _queue DT:"
+                                      + message.getDeliveryTag());
+                    }
 
-                rejectMessage(message, requeue);
+                    rejectMessage(message, true);
 
-                if (_logger.isDebugEnabled())
-                {
-                    _logger.debug("Rejected the message(" + message.toString() 
+ ") for consumer :" + consumerTag);
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Rejected the message(" + 
message.toString() + ") for consumer :" + consumerTag);
+                    }
                 }
             }
         }
@@ -3288,18 +3296,6 @@ public abstract class AMQSession<C exten
             return _closed;
         }
 
-        public void rejectPending(C consumer)
-        {
-            // Reject messages on pre-receive queue
-            consumer.rollbackPendingMessages();
-
-            // Reject messages on pre-dispatch queue
-            rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, 
false);
-
-            // closeConsumer
-            consumer.markClosed();
-
-        }
 
         public void rollback()
         {

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1669472&r1=1669471&r2=1669472&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
 Thu Mar 26 23:44:39 2015
@@ -59,6 +59,7 @@ public class FlowControllingBlockingQueu
         return _queue.isEmpty();
     }
 
+
     public interface ThresholdListener
     {
         void aboveThreshold(int currentValue);
@@ -104,14 +105,7 @@ public class FlowControllingBlockingQueu
 
         if (o != null && !disableFlowControl && _listener != null)
         {
-            synchronized (_listener)
-            {
-                if (_count-- == _flowControlLowThreshold)
-                {
-                    _listener.underThreshold(_count);
-                }
-            }
-
+            reportBelowIfNecessary();
         }
 
         return o;
@@ -132,14 +126,7 @@ public class FlowControllingBlockingQueu
         }
         if (!disableFlowControl && _listener != null)
         {
-            synchronized (_listener)
-            {
-                if (_count-- == _flowControlLowThreshold)
-                {
-                    _listener.underThreshold(_count);
-                }
-            }
-            
+            reportBelowIfNecessary();
         }
 
         return o;
@@ -155,18 +142,44 @@ public class FlowControllingBlockingQueu
         }
         if (!disableFlowControl && _listener != null)
         {
-            synchronized (_listener)
-            {
-                if (++_count == _flowControlHighThreshold)
-                {
-                    _listener.aboveThreshold(_count);
-                }
-            }
+            reportAboveIfNecessary();
         }
     }
 
+    public boolean remove(final T o)
+    {
+        final boolean removed = _queue.remove(o);
+        if (removed && !disableFlowControl && _listener != null)
+        {
+            reportBelowIfNecessary();
+        }
+        return removed;
+    }
+
     public Iterator<T> iterator()
     {
         return _queue.iterator();
     }
+
+    private void reportAboveIfNecessary()
+    {
+        synchronized (_listener)
+        {
+            if (++_count == _flowControlHighThreshold)
+            {
+                _listener.aboveThreshold(_count);
+            }
+        }
+    }
+
+    private void reportBelowIfNecessary()
+    {
+        synchronized (_listener)
+        {
+            if (_count-- == _flowControlLowThreshold)
+            {
+                _listener.underThreshold(_count);
+            }
+        }
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to