Author: rajith
Date: Tue Jan 24 23:26:46 2012
New Revision: 1235550

URL: http://svn.apache.org/viewvc?rev=1235550&view=rev
Log:
QPID-3604 Once message stop is issued for each subscriber, the client
now drains the internal queues of each subscriber. It also drains the
dispatch queue. These messages are then released without marking them as
redelivered. Messages that were given to the application but were not
acked are also released, but are marked as redelivered. All messages
received upto that point are marked as completed.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1235550&r1=1235549&r2=1235550&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Jan 24 23:26:46 2012
@@ -371,7 +371,7 @@ public abstract class AMQSession<C exten
      * Set when the dispatcher should direct incoming messages straight into 
the UnackedMessage list instead of
      * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. 
in Session.recover().
      */
-    private volatile boolean _usingDispatcherForCleanup;
+    protected volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, 
has been stopped. */
     private boolean _connectionStopped;
@@ -2247,6 +2247,58 @@ public abstract class AMQSession<C exten
         }
     }
 
+    void drainDispatchQueue()
+    {
+        if (Thread.currentThread() == _dispatcherThread)
+        {
+            while (!_closed.get() && !_queue.isEmpty())
+            {
+                Dispatchable disp;
+                try
+                {
+                    disp = (Dispatchable) _queue.take();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+
+                // Check just in case _queue becomes empty, it shouldn't but
+                // better than an NPE.
+                if (disp == null)
+                {
+                    _logger.debug("_queue became empty during sync.");
+                    break;
+                }
+
+                disp.dispatch(AMQSession.this);
+            }
+        }
+        else
+        {
+            startDispatcherIfNecessary(false);
+
+            final CountDownLatch signal = new CountDownLatch(1);
+
+            _queue.add(new Dispatchable()
+            {
+                public void dispatch(AMQSession ssn)
+                {
+                    signal.countDown();
+                }
+            });
+
+            try
+            {
+                signal.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     /**
      * Resubscribes all producers and consumers. This is called when 
performing failover.
      *

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1235550&r1=1235549&r2=1235550&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Tue Jan 24 23:26:46 2012
@@ -1354,5 +1354,45 @@ public class AMQSession_0_10 extends AMQ
         super.resubscribe();
         getQpidSession().sync();
     }
+
+    @Override
+    void stop() throws AMQException
+    {
+        super.stop();
+        synchronized (getMessageDeliveryLock())
+        {
+               for (BasicMessageConsumer consumer : _consumers.values())
+               {
+                   List<Long> tags = 
consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+                   _prefetchedMessageTags.addAll(tags);
+               }
+        }
+        _usingDispatcherForCleanup = true;
+        drainDispatchQueue();
+        _usingDispatcherForCleanup = false;
+
+        RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+               RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+               RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+                                       + prefetched.size());
+
+               for (Iterator<Range> deliveredIter = delivered.iterator(); 
deliveredIter.hasNext();)
+               {
+                       Range range = deliveredIter.next();
+                       all.add(range);
+               }
+
+               for (Iterator<Range> prefetchedIter = prefetched.iterator(); 
prefetchedIter.hasNext();)
+               {
+                       Range range = prefetchedIter.next();
+                       all.add(range);
+               }
+
+               flushProcessed(all, false);
+               
getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+               getQpidSession().messageRelease(prefetched);
+               sync();
+    }
+
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to