Rafael Schloming wrote:
Martin Ritchie wrote:
Hi,

Following on from looking at QPID-1871. I believe that there is quite
a significant change required to ensure that the message order or
rollback is maintained.

I propose that we extract the Dispatcher from AMQSession, which will
simplify our biggest class (3100+ lines!) and show clear
responsibility for incoming message processing. This will simplify
rollback as the Dispatcher thread can be given full responsibility for
clearing up the state that it knows best. Rather than the current
situation where the calling thread does some work on AMQSession whilst
the Dispatcher is running/stopping, then calls the the Dispatcher code
directly clean up the remainder. All this while the Dispatcher may be
processing a message.

Change design posted here:
http://cwiki.apache.org/confluence/display/qpid/0.6+Java+Client+Dispatcher+Changes

Comments on the investigation, implications and design welcome.
I'll capture the details on the wiki so we don't lose track of comments

Hey Martin,

Sorry I didn't pick up on this earlier. We hit this issue a while back in the 0-10 code path. That's why we added RollbackOrderTest, and that's why it doesn't fail for 0-10 brokers. You should probably check out AMQSession.syncDispatchQueue, this method pretty much solves the problem you're describing. It will block until the dispatch queue is empty... or more precisely it will block until everything that is currently in the dispatch queue has been processed by the dispatcher thread, which if done after stopping incoming message flow means it will block until the dispatch queue is empty.

This method is used in a few places in the 0-10 codepath where it is necessary to clean out the dispatch queue prior to proceeding (e.g. during failover), however the key place here is AMQSession_0_10.releaseForRollback. If you look at this you'll notice that it is called before the release is actually done. If AMQSession_0_8.releaseForRollback were to do the same, or preferrably if we were to move the syncDispatchQueue call up to AMQSession.java then I suspect this problem would go away without the need for a large refactor.

The other thing you may want to look at is the Dispatchable interface. This is how syncDispatchQueue works, and I believe it is a similar concept to the ServiceRequests that you mention.

--Rafael


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org

Reply via email to