BTW lets move this thread over to the dev list :)
On 7/14/06, James Strachan <[EMAIL PROTECTED]> wrote:
I thought I'd change the subject as really we are discussing a kind of Virtual Topics where folks can use Queues to subscribe and consume from them etc... On 7/13/06, bmadigan <[EMAIL PROTECTED]> wrote: > > This is almost working, YAY! > there are a few things I need to fix: > - Need to figure out how to add the new Broker to the factory without using > the plugin loader I was thinking, we should maybe add the Virtual Topic interceptor to the broker by default, but allow the configuration to be overridden in the activemq.xml as I can't help think it's be great to be able to use it out of the box. We've already got the "ActiveMQ.Advisory." prefix in topics to be used for advisories. So how about we use ActiveMQ.Virtual. as the prefix for the default prefix for virtual topics & we let folks customize this if they don't like the defaults? So we should maybe add a VirtualDestinationPlugin property on the BrokerService which is created by default (unless explicitly disabled using a bean property) which would auto-default to a sensible default that could be overloaded- or forks could set the virtual destinations to something else if they prefer etc. > - It may not be a problem, but I'm synchronizing on next when I create the > queues for the virtual groups in addConsumer(). This could be finer grained > I think. Am not sure if you need to do that; I think the default mechanism of clients consuming on the queue with the destination auto-created on the fly would be fine? > - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the > consumer to a virtual queue. This is probably incorrect, not sure if there > is a better way. Are you trying to transform a durable topic consumer into a queue consumer? Am not sure you need. Am thinking if you are a topic consumer (durable or non-durable) then we leave you as you are; a fully JMS compliant durable topic consumer. However to use the nice queue-centric virtual topics, you really use a queue consumer of the right name and things all just work. (BTW the topic regions are optimised so that if no consumers are available, publishing to a topic is a no-op) > - The virtual queues can't provide subscription recovery. Not sure how to > handle that. Yeah thats true. Am not too worried about that right now - but we could look at fixing that later on. > I created a BrokerFilter subclass which overrides addConsumer() and send(): > > public Subscription addConsumer(ConnectionContext cc, > ConsumerInfo ci) throws Exception { > synchronized(next){ > String name = ci.getDestination().getPhysicalName(); > if(name.startsWith(VIRTUAL)){ > Set destinations = getDestinations( > new ActiveMQQueue(name)); > if(destinations.size()==0){//create a new virtual queue > ActiveMQQueue queue = new ActiveMQQueue( > name+"?consumer.exclusive=true"); > next.addDestination(cc,queue); > ci.setDestination(queue); > }else{ //queue exists, add the consumer > ActiveMQQueue queue = (ActiveMQQueue) > destinations.iterator().next(); > ci.setDestination(queue); > } > } > } > return next.addConsumer(cc, ci); > } > > public void send(ConnectionContext ctx, > Message message) throws Exception { > String topic = message.getDestination().getPhysicalName(); > Iterator destinations = getDestinations( > new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator(); > while(destinations.hasNext()){ > Destination dest = (Destination) destinations.next(); > dest.send(ctx, message); > } > next.send(ctx, message); > } > > Except for the subscription recovery part, this seems to work. Looks great - though am thinking we only really need the send() part - as the other stuff like adding consumers should just work out side the box (unless I'm missing something). BTW do you want to submit a patch, then we can start wiring this stuff in? Great work though - am excited to see this stuff implemented! :) -- James ------- http://radio.weblogs.com/0112098/
-- James ------- http://radio.weblogs.com/0112098/