On 08/05/2009 02:45 AM, Gordon Sim wrote:
This looks messy:
listener.subscribed(session.subscribe("message_queue", listener));
Informing the listener can be done as part of the implementation of
session.subscribe(), it doesn't
need to be left to the user.
I'll have a think about this.
My main objection to the line above is the fact that listener is mentioned
twice. What does it mean if you use two different listeners? I think its an
error, in which case the API should be modified to make that error impossibl,
e.g. by doing the whole thing atomically in sesision.susbcribe. This also avoids
the error where the user forgets to call listener.subscribed(). I'm assuming
here that part of the semantics of MessageListener is that it will get a call to
subscribed() each time it is subscribed.
We need to sort out our threading model in this new API. The critical
thing is to allow many sessions to be served by the same thread or
thread pool pool.
I agree that this is an important use case. I'll go in to this in more
detail shortly.
We also want to integrate that with our own client side poller, and of
course keep the current model for backwards compatibility.
How about providing two alternatives:
Session {
void dispatch(); // Calling thread dispatches session.
// OR
void activate(); // Session is dispatched by qpid's own thread pool.
Does not block.
void deactivate(); // Stop dispatching in qpid thread pool, blocks
till current operation completes.
void wait(); // Wait for activated session to be closed, or the last
subscription to be cancelled.
}
I think that covers the majority of cases. The other case that has
been mentioned is providing a selectable fd to dispatch to qpid from
someone elses select/poll/epoll loop. I'd say that's an addition for
the 1.1 release of the new API, not necessarily for 1.0.
Agree (on both points!). Thanks again for all the comments and suggestions!
The commnet on void wait(); above should say "wait for session to be
deactivated, which occurs when session is closed, last subscription is cancelled
or deactivate() is called on the session."
Thinking: if we auto-deactivate on subscription cancel, should we auto-activate
on subscribe (if the sesssion was previously activated() rather than dispatched())?
Thinking some more: maintaining a thread pool in global variables is going to
create a lot of start-up and shut down ordering issues. Since this is a new API
I think it might be a good idea to introduce a qpid::Qpid object which can
encapsulate the thread pool and any other context that is shared between all
connections. So the examples would go
main() {
Qpid qpid;
Connection c = qpid.open(...);
// Qpid dtor cleans up here.
}
That lets us do all our init & shut-down inside main and avoids problems of
static ctor/dtor ordering. That also gives a place to configure things like the
client-side thread pool size etc.
qpid.setWorkerThreads(10); // 10 worker threads in the pool.
We could also provide built-in argc/argv processing:
main (int argc, char**argv) {
Qpid qpid(argc, argv);
// Here argc/argv have been modified to remove all --qpid arguments.
}
That gives an out-of-the-box way for customers to write configurable clients.
For the client side I think we'd prefix all the options with --qpid- to avoid
clashes with user options. e.g. --qpid-worker-threads etc.
Above ideas shamelessly stolen from CORBA. There is a significant maintenance
advantage to keeping it all in main() in terms of lost time chasing sporadic
shutdown bugs.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]