Author: dejanb
Date: Thu Oct 21 09:32:47 2010
New Revision: 1025905
URL: http://svn.apache.org/viewvc?rev=1025905&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2993 - virtual topics and
advisory messages
Added:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?rev=1025905&r1=1025904&r2=1025905&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
Thu Oct 21 09:32:47 2010
@@ -41,8 +41,10 @@ public class VirtualTopicInterceptor ext
}
public void send(ProducerBrokerExchange context, Message message) throws
Exception {
- ActiveMQDestination queueConsumers =
getQueueConsumersWildcard(message.getDestination());
- send(context, message, queueConsumers);
+ if (!message.isAdvisory()) {
+ ActiveMQDestination queueConsumers =
getQueueConsumersWildcard(message.getDestination());
+ send(context, message, queueConsumers);
+ }
super.send(context, message);
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java?rev=1025905&r1=1025904&r2=1025905&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
Thu Oct 21 09:32:47 2010
@@ -28,21 +28,26 @@ import org.apache.activemq.broker.region
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.springframework.core.io.ClassPathResource;
public class BrowseOverNetworkTest extends JmsMultipleBrokersTestSupport {
private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
protected static final int MESSAGE_COUNT = 10;
public void testBrowse() throws Exception {
+ createBroker(new
URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
+ createBroker(new
URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
+
bridgeBrokers("BrokerA", "BrokerB");
+
startAllBrokers();
Destination dest = createDestination("TEST.FOO", false);
sendMessages("BrokerA", dest, MESSAGE_COUNT);
- browseMessages(dest);
+ browseMessages("BrokerB", dest);
Thread.sleep(2000);
@@ -61,23 +66,40 @@ public class BrowseOverNetworkTest exten
+ msgsB.getMessageCount());
}
- protected void browseMessages(Destination dest) throws Exception {
- QueueBrowser browser = createBrowser("BrokerB", dest);
+ public void testconsumerInfo() throws Exception {
+ createBroker(new
ClassPathResource("org/apache/activemq/usecases/browse-broker1.xml"));
+ createBroker(new
ClassPathResource("org/apache/activemq/usecases/browse-broker2.xml"));
+
+ startAllBrokers();
+
+ brokers.get("broker1").broker.waitUntilStarted();
+
+
+ Destination dest = createDestination("QUEUE.A,QUEUE.B", false);
+
+
+ int broker1 = browseMessages("broker1", dest);
+ assertEquals("Browsed a message on an empty queue", 0, broker1);
+ Thread.sleep(1000);
+ int broker2 = browseMessages("broker2", dest);
+ assertEquals("Browsed a message on an empty queue", 0, broker2);
+
+ }
+
+ protected int browseMessages(String broker, Destination dest) throws
Exception {
+ QueueBrowser browser = createBrowser(broker, dest);
Enumeration msgs = browser.getEnumeration();
int browsedMessage = 0;
while (msgs.hasMoreElements()) {
browsedMessage++;
msgs.nextElement();
}
+ return browsedMessage;
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
- createBroker(new URI(
-
"broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
- createBroker(new URI(
-
"broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
}
}
Added:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml?rev=1025905&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
(added)
+++
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
Thu Oct 21 09:32:47 2010
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <!-- Broker1 ?useQueueForAccept=false -->
+ <amq:broker brokerName="broker1" id="broker1" useJmx="true"
+ persistent="true" start="false"
advisorySupport="true">
+
+
+ <amq:destinationInterceptors>
+ <amq:virtualDestinationInterceptor>
+ <amq:virtualDestinations>
+ <amq:virtualTopic name=">" prefix="VTopic.*." />
+ </amq:virtualDestinations>
+ </amq:virtualDestinationInterceptor>
+ </amq:destinationInterceptors>
+
+ <amq:networkConnectors>
+
+ <amq:networkConnector uri="static:(tcp://localhost:61617)"
name="broker1_broker2">
+
+ <amq:excludedDestinations>
+ <amq:queue physicalName="QUEUE.A"/>
+ </amq:excludedDestinations>
+ <amq:staticallyIncludedDestinations>
+ <amq:queue physicalName="QUEUE.B"/>
+ </amq:staticallyIncludedDestinations>
+ </amq:networkConnector>
+
+ </amq:networkConnectors>
+
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:61616" />
+ </amq:transportConnectors>
+
+ </amq:broker>
+
+
+</beans>
Added:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml?rev=1025905&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
(added)
+++
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
Thu Oct 21 09:32:47 2010
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+
+ <!-- Broker2 (lonb) -->
+ <amq:broker brokerName="broker2" id="broker2" useJmx="true"
+ persistent="true" start="false"
advisorySupport="true">
+
+ <!-- Network connectors -->
+ <amq:networkConnectors>
+
+ <amq:networkConnector uri="static:(tcp://localhost:61616)"
name="broker2_broker1">
+
+ <amq:excludedDestinations>
+ <amq:queue physicalName="QUEUE.B"/>
+ </amq:excludedDestinations>
+ <amq:staticallyIncludedDestinations>
+ <amq:queue physicalName="QUEUE.A"/>
+ </amq:staticallyIncludedDestinations>
+ </amq:networkConnector>
+
+ </amq:networkConnectors>
+
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:61617" />
+ </amq:transportConnectors>
+
+ </amq:broker>
+
+ </beans>
\ No newline at end of file