Author: jstrachan Date: Mon Nov 27 09:16:33 2006 New Revision: 479694 URL: http://svn.apache.org/viewvc?view=rev&rev=479694 Log: added support for AMQ-1073 to allow selectors to be used with virtual destinations
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java (with props) incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java (with props) incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java?view=diff&rev=479694&r1=479693&r2=479694 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationInterceptor.java Mon Nov 27 09:16:33 2006 @@ -22,6 +22,7 @@ import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; +import org.apache.activemq.filter.MessageEvaluationContext; import java.util.Collection; import java.util.Iterator; @@ -29,7 +30,7 @@ /** * Represents a composite [EMAIL PROTECTED] Destination} where send()s are replicated to * each Destination instance. - * + * * @version $Revision$ */ public class CompositeDestinationInterceptor extends DestinationFilter { @@ -46,8 +47,29 @@ } public void send(ConnectionContext context, Message message) throws Exception { + MessageEvaluationContext messageContext = null; + for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) { - ActiveMQDestination destination = (ActiveMQDestination) iter.next(); + ActiveMQDestination destination = null; + Object value = iter.next(); + + if (value instanceof FilteredDestination) { + FilteredDestination filteredDestination = (FilteredDestination) value; + if (messageContext == null) { + messageContext = new MessageEvaluationContext(); + messageContext.setMessageReference(message); + } + messageContext.setDestination(filteredDestination.getDestination()); + if (filteredDestination.matches(messageContext)) { + destination = filteredDestination.getDestination(); + } + } + else if (value instanceof ActiveMQDestination) { + destination = (ActiveMQDestination) value; + } + if (destination == null) { + continue; + } if (copyMessage) { message = message.copy(); Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java?view=auto&rev=479694 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java Mon Nov 27 09:16:33 2006 @@ -0,0 +1,95 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.virtual; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.filter.BooleanExpression; +import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.selector.SelectorParser; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + +/** + * Represents a destination which is filtered using some predicate such as a selector + * so that messages are only dispatched to the destination if they match the filter. + * + * @org.apache.xbean.XBean + * + * @version $Revision$ + */ +public class FilteredDestination { + + private ActiveMQDestination destination; + private String selector; + private BooleanExpression filter; + + public boolean matches(MessageEvaluationContext context) throws JMSException { + BooleanExpression booleanExpression = getFilter(); + if (booleanExpression == null) { + return false; + } + return booleanExpression.matches(context); + } + + public ActiveMQDestination getDestination() { + return destination; + } + + /** + * The destination to send messages to if they match the filter + */ + public void setDestination(ActiveMQDestination destination) { + this.destination = destination; + } + + public String getSelector() { + return selector; + } + + /** + * Sets the JMS selector used to filter messages before forwarding them to this destination + */ + public void setSelector(String selector) throws InvalidSelectorException { + this.selector = selector; + setFilter(new SelectorParser().parse(selector)); + } + + public BooleanExpression getFilter() { + return filter; + } + + public void setFilter(BooleanExpression filter) { + this.filter = filter; + } + + + /** + * Sets the destination property to the given queue name + */ + public void setQueue(String queue) { + setDestination(ActiveMQDestination.createDestination(queue, ActiveMQDestination.QUEUE_TYPE)); + } + + /** + * Sets the destination property to the given topic name + */ + public void setTopic(String topic) { + setDestination(ActiveMQDestination.createDestination(topic, ActiveMQDestination.TOPIC_TYPE)); + } +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/FilteredDestination.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java?view=diff&rev=479694&r1=479693&r2=479694 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java (original) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/CompositeQueueTest.java Mon Nov 27 09:16:33 2006 @@ -30,6 +30,8 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.JMSException; import java.net.URI; @@ -43,6 +45,9 @@ private Connection connection; + protected int total = 10; + + public void testVirtualTopicCreation() throws Exception { if (connection == null) { connection = createConnection(); @@ -73,15 +78,27 @@ MessageProducer producer = session.createProducer(producerDestination); assertNotNull(producer); - int total = 10; for (int i = 0; i < total; i++) { - producer.send(session.createTextMessage("message: " + i)); + producer.send(createMessage(session, i)); } + assertMessagesArrived(messageList1, messageList2); + } + + protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { messageList1.assertMessagesArrived(total); messageList2.assertMessagesArrived(total); } - + + protected TextMessage createMessage(Session session, int i) throws JMSException { + TextMessage textMessage = session.createTextMessage("message: " + i); + if (i % 2 == 1) { + textMessage.setStringProperty("odd", "yes"); + } + textMessage.setIntProperty("i", i); + return textMessage; + } + protected Destination getConsumer1Dsetination() { return new ActiveMQQueue("FOO"); } Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java?view=auto&rev=479694 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java Mon Nov 27 09:16:33 2006 @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import org.apache.activemq.spring.ConsumerBean; + +/** + * @version $Revision$ + */ +public class FilteredQueueTest extends CompositeQueueTest { + + @Override + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/virtual/filtered-queue.xml"; + } + + @Override + protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { + messageList1.assertMessagesArrived(total / 2); + messageList2.assertMessagesArrived(1); + } +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/FilteredQueueTest.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml?view=auto&rev=479694 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml (added) +++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml Mon Nov 27 09:16:33 2006 @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker xmlns="http://activemq.org/config/1.0"> + <destinationInterceptors> + <virtualDestinationInterceptor> + <virtualDestinations> + <compositeQueue name="MY.QUEUE"> + <forwardTo> + <filteredDestination selector="odd = 'yes'" queue="FOO"/> + <filteredDestination selector="i = 5" topic="BAR"/> + </forwardTo> + </compositeQueue> + </virtualDestinations> + </virtualDestinationInterceptor> + </destinationInterceptors> + + </broker> + +</beans> +<!-- END SNIPPET: xbean --> Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/filtered-queue.xml ------------------------------------------------------------------------------ svn:mime-type = text/xml