Author: dejanb
Date: Tue Jan 27 10:42:35 2009
New Revision: 738052
URL: http://svn.apache.org/viewvc?rev=738052&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2086
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/receiver-duplex.xml
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/sender-duplex.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=738052&r1=738051&r2=738052&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Tue Jan 27 10:42:35 2009
@@ -441,6 +441,9 @@
if
(AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
} else {
+ if
(!isPermissableDestination(message.getDestination())) {
+ return;
+ }
if (message.isResponseRequired()) {
Response reply = new Response();
reply.setCorrelationId(message.getCommandId());
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java?rev=738052&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TestBrokerConnectionDuplexExcludedDestinations.java
Tue Jan 27 10:42:35 2009
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class TestBrokerConnectionDuplexExcludedDestinations extends TestCase {
+
+ public void testBrokerConnectionDuplexPropertiesPropagation()
+ throws Exception {
+
+ // Hub broker
+ String configFileName =
"org/apache/activemq/usecases/receiver-duplex.xml";
+ URI uri = new URI("xbean:" + configFileName);
+ BrokerService receiverBroker = BrokerFactory.createBroker(uri);
+ receiverBroker.setPersistent(false);
+ receiverBroker.setBrokerName("Hub");
+
+ // Spoke broker
+ configFileName =
"org/apache/activemq/usecases/sender-duplex.xml";
+ uri = new URI("xbean:" + configFileName);
+ BrokerService senderBroker = BrokerFactory.createBroker(uri);
+ senderBroker.setPersistent(false);
+ receiverBroker.setBrokerName("Spoke");
+
+ // Start both Hub and Spoke broker
+ receiverBroker.start();
+ senderBroker.start();
+
+ final ConnectionFactory cfHub = new ActiveMQConnectionFactory(
+ "tcp://localhost:62002");
+ final Connection hubConnection = cfHub.createConnection();
+ hubConnection.start();
+ final Session hubSession = hubConnection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer hubProducer =
hubSession.createProducer(null);
+ hubProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ hubProducer.setDisableMessageID(true);
+ hubProducer.setDisableMessageTimestamp(true);
+
+ final Queue excludedQueueHub =
hubSession.createQueue("exclude.test.foo");
+ final TextMessage excludedMsgHub =
hubSession.createTextMessage();
+ excludedMsgHub.setText(excludedQueueHub.toString());
+
+ final Queue includedQueueHub =
hubSession.createQueue("include.test.foo");
+
+ final TextMessage includedMsgHub =
hubSession.createTextMessage();
+ excludedMsgHub.setText(includedQueueHub.toString());
+
+ // Sending from Hub queue
+ hubProducer.send(excludedQueueHub, excludedMsgHub);
+ hubProducer.send(includedQueueHub, includedMsgHub);
+
+ final ConnectionFactory cfSpoke = new ActiveMQConnectionFactory(
+ "tcp://localhost:62001");
+ final Connection spokeConnection = cfSpoke.createConnection();
+ spokeConnection.start();
+ final Session spokeSession =
spokeConnection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ final Queue excludedQueueSpoke =
spokeSession.createQueue("exclude.test.foo");
+ final MessageConsumer excludedConsumerSpoke = spokeSession
+ .createConsumer(excludedQueueSpoke);
+
+ final Queue includedQueueSpoke =
spokeSession.createQueue("include.test.foo");
+ final MessageConsumer includedConsumerSpoke = spokeSession
+ .createConsumer(includedQueueSpoke);
+
+ // Receiving from excluded Spoke queue
+ Message msg = excludedConsumerSpoke.receive(200);
+ assertNull(msg);
+
+ // Receiving from included Spoke queue
+ msg = includedConsumerSpoke.receive(200);
+ assertEquals(msg, includedMsgHub);
+
+ excludedConsumerSpoke.close();
+ hubSession.close();
+ hubConnection.stop();
+ hubConnection.close();
+ hubProducer.close();
+ spokeSession.close();
+ spokeConnection.stop();
+ spokeConnection.close();
+
+ senderBroker.stop();
+ receiverBroker.stop();
+
+ }
+}
Added:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/receiver-duplex.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/receiver-duplex.xml?rev=738052&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/receiver-duplex.xml
(added)
+++
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/receiver-duplex.xml
Tue Jan 27 10:42:35 2009
@@ -0,0 +1,48 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.org/config/1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.org/config/1.0
http://activemq.apache.org/schema/activemq-core.xsd
+ http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+ <!-- Allows us to use system properties as variables in this configuration
file -->
+ <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker brokerName="receiver" persistent="false" useJmx="true"
xmlns="http://activemq.org/config/1.0" >
+
+
+
+ <!-- The transport connectors ActiveMQ will listen to -->
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:62002"/>
+ </transportConnectors>
+
+ <!-- The store and forward broker networks ActiveMQ will listen to -->
+ <networkConnectors>
+ </networkConnectors>
+
+
+ </broker>
+
+
+
+</beans>
+
+
Added:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/sender-duplex.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/sender-duplex.xml?rev=738052&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/sender-duplex.xml
(added)
+++
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/sender-duplex.xml
Tue Jan 27 10:42:35 2009
@@ -0,0 +1,67 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.org/config/1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.org/config/1.0
http://activemq.apache.org/schema/activemq-core.xsd
+ http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
+
+ <!-- Allows us to use system properties as variables in this configuration
file -->
+ <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker brokerName="sender" persistent="false" useJmx="true"
xmlns="http://activemq.org/config/1.0" > <!--
dataDirectory="${activemq.base}/data"> -->
+
+
+
+
+ <!-- The transport connectors ActiveMQ will listen to -->
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:62001"/>
+ </transportConnectors>
+
+ <!-- The store and forward broker networks ActiveMQ will listen to -->
+ <networkConnectors>
+ <!-- by default just auto discover the other brokers -->
+ <networkConnector name="monitoring" uri="static:(tcp://localhost:62002)"
duplex="true" >
+ <excludedDestinations>
+ <queue physicalName="exclude.test.foo"/>
+ <topic physicalName="exclude.test.bar"/>
+ </excludedDestinations>
+ <dynamicallyIncludedDestinations>
+ <queue physicalName="include.test.foo"/>
+ <topic physicalName="include.test.bar"/>
+ </dynamicallyIncludedDestinations>
+ <staticallyIncludedDestinations>
+ <queue physicalName="always.include.queue"/>
+ <topic physicalName="always.include.topic"/>
+ </staticallyIncludedDestinations>
+
+
+
+ </networkConnector>
+ </networkConnectors>
+
+
+
+ </broker>
+
+
+</beans>
+
+