The drainDispatchQueue() method has a fair amount of duplicate code with the syncDispatchQueue() method. Next step is to combine the common parts to get rid of duplicate code.
Rajith ---------- Forwarded message ---------- From: <[email protected]> Date: Tue, Jan 24, 2012 at 6:48 PM Subject: failure notice To: [email protected] Hi. This is the qmail-send program at apache.org. I'm afraid I wasn't able to deliver your message to the following addresses. This is a permanent error; I've given up. Sorry it didn't work out. <[email protected]>: Must be sent from an @apache.org address. --- Below this line is a copy of the message. Return-Path: <[email protected]> Received: (qmail 63077 invoked by uid 99); 24 Jan 2012 23:48:06 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2012 23:48:06 +0000 X-ASF-Spam-Status: No, hits=-0.5 required=5.0 tests=FREEMAIL_ENVFROM_END_DIGIT,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of [email protected] designates 209.85.214.170 as permitted sender) Received: from [209.85.214.170] (HELO mail-tul01m020-f170.google.com) (209.85.214.170) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2012 23:47:57 +0000 Received: by obbup3 with SMTP id up3so3359185obb.15 for <[email protected]>; Tue, 24 Jan 2012 15:47:36 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:content-transfer-encoding; bh=szoMtIlzqxU0xVU4u3wYecP/SmwatOSwERUNmFdPMTg=; b=KDIxjVicAJNjeUe0nofclYwJquaBDD3vSzvgatx1/C5IpB/bYPSNK5V47KJ54MZsm9 Cw2gEXTsSmZUJJHr4JFpmQ79qtlWMeXQHfLHYshY+GtcidaC0GOvu7L2IKTF0bq5stI2 Sw/y1xnzBiGStfzpwJdebxJeCIdX13ugVnGvk= MIME-Version: 1.0 Received: by 10.182.192.36 with SMTP id hd4mr932729obc.60.1327448856745; Tue, 24 Jan 2012 15:47:36 -0800 (PST) Received: by 10.60.28.225 with HTTP; Tue, 24 Jan 2012 15:47:36 -0800 (PST) In-Reply-To: <[email protected]> References: <[email protected]> Date: Tue, 24 Jan 2012 18:47:36 -0500 Message-ID: <ca+m2xzsxskrpdnnfgkfhkccdyn1ujzk44autgbmqvm_hd21...@mail.gmail.com> Subject: Re: svn commit: r1235550 - in /qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession.java AMQSession_0_10.java From: Rajith Attapattu <[email protected]> To: [email protected] Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org The drainDispatchQueue() method has a fair amount of duplicate code with the syncDispatchQueue() method. Next step is to combine the common parts to get rid of duplicate code. Rajith On Tue, Jan 24, 2012 at 6:26 PM, <[email protected]> wrote: > Author: rajith > Date: Tue Jan 24 23:26:46 2012 > New Revision: 1235550 > > URL: http://svn.apache.org/viewvc?rev=3D1235550&view=3Drev > Log: > QPID-3604 Once message stop is issued for each subscriber, the client > now drains the internal queues of each subscriber. It also drains the > dispatch queue. These messages are then released without marking them as > redelivered. Messages that were given to the application but were not > acked are also released, but are marked as redelivered. All messages > received upto that point are marked as completed. > > Modified: > =A0 =A0qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/A= MQSession.java > =A0 =A0qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/A= MQSession_0_10.java > > Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/clien= t/AMQSession.java > URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/ja= va/org/apache/qpid/client/AMQSession.java?rev=3D1235550&r1=3D1235549&r2=3D1= 235550&view=3Ddiff > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQS= ession.java (original) > +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQS= ession.java Tue Jan 24 23:26:46 2012 > @@ -371,7 +371,7 @@ public abstract class AMQSession<C exten > =A0 =A0 =A0* Set when the dispatcher should direct incoming messages stra= ight into the UnackedMessage list instead of > =A0 =A0 =A0* to the syncRecieveQueue or MessageListener. Used during clea= nup, e.g. in Session.recover(). > =A0 =A0 =A0*/ > - =A0 =A0private volatile boolean _usingDispatcherForCleanup; > + =A0 =A0protected volatile boolean _usingDispatcherForCleanup; > > =A0 =A0 /** Used to indicates that the connection to which this session b= elongs, has been stopped. */ > =A0 =A0 private boolean _connectionStopped; > @@ -2247,6 +2247,58 @@ public abstract class AMQSession<C exten > =A0 =A0 =A0 =A0 } > =A0 =A0 } > > + =A0 =A0void drainDispatchQueue() > + =A0 =A0{ > + =A0 =A0 =A0 =A0if (Thread.currentThread() =3D=3D _dispatcherThread) > + =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0while (!_closed.get() && !_queue.isEmpty()) > + =A0 =A0 =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Dispatchable disp; > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0try > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0disp =3D (Dispatchable) _queue.t= ake(); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0catch (InterruptedException e) > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0throw new RuntimeException(e); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > + > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0// Check just in case _queue becomes emp= ty, it shouldn't but > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0// better than an NPE. > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (disp =3D=3D null) > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0_logger.debug("_queue became emp= ty during sync."); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0break; > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > + > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0disp.dispatch(AMQSession.this); > + =A0 =A0 =A0 =A0 =A0 =A0} > + =A0 =A0 =A0 =A0} > + =A0 =A0 =A0 =A0else > + =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0startDispatcherIfNecessary(false); > + > + =A0 =A0 =A0 =A0 =A0 =A0final CountDownLatch signal =3D new CountDownLat= ch(1); > + > + =A0 =A0 =A0 =A0 =A0 =A0_queue.add(new Dispatchable() > + =A0 =A0 =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0public void dispatch(AMQSession ssn) > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0signal.countDown(); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0} > + =A0 =A0 =A0 =A0 =A0 =A0}); > + > + =A0 =A0 =A0 =A0 =A0 =A0try > + =A0 =A0 =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0signal.await(); > + =A0 =A0 =A0 =A0 =A0 =A0} > + =A0 =A0 =A0 =A0 =A0 =A0catch (InterruptedException e) > + =A0 =A0 =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0throw new RuntimeException(e); > + =A0 =A0 =A0 =A0 =A0 =A0} > + =A0 =A0 =A0 =A0} > + =A0 =A0} > + > =A0 =A0 /** > =A0 =A0 =A0* Resubscribes all producers and consumers. This is called whe= n performing failover. > =A0 =A0 =A0* > > Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/clien= t/AMQSession_0_10.java > URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/ja= va/org/apache/qpid/client/AMQSession_0_10.java?rev=3D1235550&r1=3D1235549&r= 2=3D1235550&view=3Ddiff > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D > --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQS= ession_0_10.java (original) > +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQS= ession_0_10.java Tue Jan 24 23:26:46 2012 > @@ -1354,5 +1354,45 @@ public class AMQSession_0_10 extends AMQ > =A0 =A0 =A0 =A0 super.resubscribe(); > =A0 =A0 =A0 =A0 getQpidSession().sync(); > =A0 =A0 } > + > + =A0 =A0@Override > + =A0 =A0void stop() throws AMQException > + =A0 =A0{ > + =A0 =A0 =A0 =A0super.stop(); > + =A0 =A0 =A0 =A0synchronized (getMessageDeliveryLock()) > + =A0 =A0 =A0 =A0{ > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 for (BasicMessageConsumer consumer : _consu= mers.values()) > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 { > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 List<Long> tags =3D consumer.drainR= eceiverQueueAndRetrieveDeliveryTags(); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 _prefetchedMessageTags.addAll(tags)= ; > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 } > + =A0 =A0 =A0 =A0} > + =A0 =A0 =A0 =A0_usingDispatcherForCleanup =3D true; > + =A0 =A0 =A0 =A0drainDispatchQueue(); > + =A0 =A0 =A0 =A0_usingDispatcherForCleanup =3D false; > + > + =A0 =A0 =A0 =A0RangeSet delivered =3D gatherRangeSet(_unacknowledgedMes= sageTags); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 RangeSet prefetched =3D gatherRangeSet(_pre= fetchedMessageTags); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 RangeSet all =3D RangeSetFactory.createRang= eSet(delivered.size() > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0= =A0 + prefetched.size()); > + > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 for (Iterator<Range> deliveredIter =3D deli= vered.iterator(); deliveredIter.hasNext();) > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 { > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 Range range =3D deliveredIt= er.next(); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 all.add(range); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 } > + > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 for (Iterator<Range> prefetchedIter =3D pre= fetched.iterator(); prefetchedIter.hasNext();) > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 { > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 Range range =3D prefetchedI= ter.next(); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 all.add(range); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 } > + > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 flushProcessed(all, false); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 getQpidSession().messageRelease(delivered,O= ption.SET_REDELIVERED); > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 getQpidSession().messageRelease(prefetched)= ; > + =A0 =A0 =A0 =A0 =A0 =A0 =A0 sync(); > + =A0 =A0} > + > =A0} > > > > > --------------------------------------------------------------------- > Apache Qpid - AMQP Messaging Implementation > Project: =A0 =A0 =A0http://qpid.apache.org > Use/Interact: mailto:[email protected] > --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
