Author: rajdavies
Date: Sat May 23 07:49:39 2009
New Revision: 777821

URL: http://svn.apache.org/viewvc?rev=777821&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1629

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=777821&r1=777820&r2=777821&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 Sat May 23 07:49:39 2009
@@ -435,12 +435,23 @@
                     remoteBrokerInfo = (BrokerInfo)command;
                     Properties props = 
MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
                     try {
-                       IntrospectionSupport.getProperties(configuration, 
props, null);
-                       excludedDestinations = 
configuration.getExcludedDestinations().toArray(new 
ActiveMQDestination[configuration.getExcludedDestinations().size()]);
-                       staticallyIncludedDestinations = 
configuration.getStaticallyIncludedDestinations().toArray(new 
ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
-                       dynamicallyIncludedDestinations = 
configuration.getDynamicallyIncludedDestinations().toArray(new 
ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
+                        IntrospectionSupport.getProperties(configuration, 
props, null);
+                        if (configuration.getExcludedDestinations() != null) {
+                            excludedDestinations = 
configuration.getExcludedDestinations().toArray(
+                                    new 
ActiveMQDestination[configuration.getExcludedDestinations().size()]);
+                        }
+                        if (configuration.getStaticallyIncludedDestinations() 
!= null) {
+                            staticallyIncludedDestinations = 
configuration.getStaticallyIncludedDestinations().toArray(
+                                    new 
ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
+                        }
+                        if (configuration.getDynamicallyIncludedDestinations() 
!= null) {
+                            dynamicallyIncludedDestinations = 
configuration.getDynamicallyIncludedDestinations()
+                                    .toArray(
+                                            new 
ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
+                                                    .size()]);
+                        }
                     } catch (Throwable t) {
-                       LOG.error("Error mapping remote destinations", t);
+                        LOG.error("Error mapping remote destinations", t);
                     }
                     serviceRemoteBrokerInfo(command);
                     // Let the local broker know the remote broker's ID.
@@ -878,19 +889,21 @@
                }
         } 
 
-        DestinationFilter filter = DestinationFilter.parseFilter(destination);
+        final DestinationFilter filter = 
DestinationFilter.parseFilter(destination);
+        
         ActiveMQDestination[] dests = excludedDestinations;
         if (dests != null && dests.length > 0) {
             for (int i = 0; i < dests.length; i++) {
+                DestinationFilter exclusionFilter = filter;
                 ActiveMQDestination match = dests[i];
-                if (filter instanceof 
org.apache.activemq.filter.SimpleDestinationFilter) {
+                if (exclusionFilter instanceof 
org.apache.activemq.filter.SimpleDestinationFilter) {
                     DestinationFilter newFilter = 
DestinationFilter.parseFilter(match);
                     if (!(newFilter instanceof 
org.apache.activemq.filter.SimpleDestinationFilter)) {
-                        filter = newFilter;
+                        exclusionFilter = newFilter;
                         match = destination;
                     }
                 }
-                if (match != null && filter.matches(match)) {
+                if (match != null && exclusionFilter.matches(match) && 
dests[i].getDestinationType() == destination.getDestinationType()) {
                     return false;
                 }
             }
@@ -898,15 +911,16 @@
         dests = dynamicallyIncludedDestinations;
         if (dests != null && dests.length > 0) {
             for (int i = 0; i < dests.length; i++) {
+                DestinationFilter inclusionFilter = filter;
                 ActiveMQDestination match = dests[i];
-                if (filter instanceof 
org.apache.activemq.filter.SimpleDestinationFilter) {
+                if (inclusionFilter instanceof 
org.apache.activemq.filter.SimpleDestinationFilter) {
                     DestinationFilter newFilter = 
DestinationFilter.parseFilter(match);
                     if (!(newFilter instanceof 
org.apache.activemq.filter.SimpleDestinationFilter)) {
-                        filter = newFilter;
+                        inclusionFilter = newFilter;
                         match = destination;
                     }
                 }
-                if (match != null && filter.matches(match)) {
+                if (match != null && inclusionFilter.matches(match) && 
dests[i].getDestinationType() == destination.getDestinationType()) {
                     return true;
                 }
             }

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java?rev=777821&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
 Sat May 23 07:49:39 2009
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+
+public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
+
+    private DemandForwardingBridge bridge;
+
+    private StubConnection producerConnection;
+
+    private ProducerInfo producerInfo;
+
+    private StubConnection consumerConnection;
+
+    private SessionInfo consumerSessionInfo;
+
+    public void testWildcardOnExcludedDestination() throws Exception {
+
+        bridge.setExcludedDestinations(new ActiveMQDestination[] { 
ActiveMQDestination.createDestination("OTHER.>",
+            ActiveMQDestination.TOPIC_TYPE) });
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { 
ActiveMQDestination.createDestination(
+            "TEST", ActiveMQDestination.QUEUE_TYPE) });
+        bridge.start();
+
+        assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    public void testWildcardOnTwoExcludedDestination() throws Exception {
+
+        bridge.setExcludedDestinations(new ActiveMQDestination[] {
+                ActiveMQDestination.createDestination("OTHER.>", 
ActiveMQDestination.QUEUE_TYPE),
+                ActiveMQDestination.createDestination("TEST.X1", 
ActiveMQDestination.QUEUE_TYPE) });
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { 
ActiveMQDestination.createDestination(
+            "TEST.X2", ActiveMQDestination.QUEUE_TYPE) });
+        bridge.start();
+
+        assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    public void testWildcardOnDynamicallyIncludedDestination() throws 
Exception {
+
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] {
+                ActiveMQDestination.createDestination("OTHER.>", 
ActiveMQDestination.QUEUE_TYPE),
+                ActiveMQDestination.createDestination("TEST.X2", 
ActiveMQDestination.QUEUE_TYPE) });
+        bridge.start();
+
+        assertReceiveMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    public void testDistinctTopicAndQueue() throws Exception {
+
+        bridge.setExcludedDestinations(new ActiveMQDestination[] { 
ActiveMQDestination.createDestination(">",
+            ActiveMQDestination.TOPIC_TYPE) });
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { 
ActiveMQDestination.createDestination(
+            ">", ActiveMQDestination.QUEUE_TYPE) });
+        bridge.start();
+
+        assertReceiveMessageOn("TEST", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("TEST", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    public void testListOfExcludedDestinationWithWildcard() throws Exception {
+
+        bridge.setExcludedDestinations(new ActiveMQDestination[] {
+                ActiveMQDestination.createDestination("OTHER.>", 
ActiveMQDestination.TOPIC_TYPE),
+                ActiveMQDestination.createDestination("TEST.*", 
ActiveMQDestination.TOPIC_TYPE) });
+
+        bridge.setDynamicallyIncludedDestinations(new ActiveMQDestination[] { 
ActiveMQDestination.createDestination(
+            "TEST.X1", ActiveMQDestination.QUEUE_TYPE) });
+
+        bridge.start();
+
+        assertReceiveMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
+        assertReceiveNoMessageOn("OTHER.T1", ActiveMQDestination.TOPIC_TYPE);
+        assertReceiveNoMessageOn("OTHER.T2", ActiveMQDestination.TOPIC_TYPE);
+    }
+
+    private void assertReceiveMessageOn(String destinationName, byte 
destinationType) throws Exception,
+            InterruptedException {
+
+        ActiveMQDestination destination = 
ActiveMQDestination.createDestination(destinationName, destinationType);
+
+        // Send the message to the local broker.
+        producerConnection.send(createMessage(producerInfo, destination, 
destinationType));
+
+        // Make sure the message was delivered via the remote.
+        Message m = createConsumerAndReceiveMessage(destination);
+
+        assertNotNull(m);
+    }
+
+    private void assertReceiveNoMessageOn(String destinationName, byte 
destinationType) throws Exception,
+            InterruptedException {
+
+        ActiveMQDestination destination = 
ActiveMQDestination.createDestination(destinationName, destinationType);
+
+        // Send the message to the local broker.
+        producerConnection.send(createMessage(producerInfo, destination, 
destinationType));
+
+        // Make sure the message was delivered via the remote.
+        Message m = createConsumerAndReceiveMessage(destination);
+        assertNull(m);
+    }
+
+    private Message createConsumerAndReceiveMessage(ActiveMQDestination 
destination) throws Exception {
+        // Now create remote consumer that should cause message to move to this
+        // remote consumer.
+        ConsumerInfo consumerInfo = createConsumerInfo(consumerSessionInfo, 
destination);
+        consumerConnection.send(consumerInfo);
+
+        Message m = receiveMessage(consumerConnection);
+        return m;
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        config.setBrokerName("local");
+        config.setDispatchAsync(false);
+        bridge = new DemandForwardingBridge(config, createTransport(), 
createRemoteTransport());
+        bridge.setBrokerService(broker);
+
+        producerConnection = createConnection();
+        ConnectionInfo producerConnectionInfo = createConnectionInfo();
+        SessionInfo producerSessionInfo = 
createSessionInfo(producerConnectionInfo);
+        producerInfo = createProducerInfo(producerSessionInfo);
+        producerConnection.send(producerConnectionInfo);
+        producerConnection.send(producerSessionInfo);
+        producerConnection.send(producerInfo);
+
+        consumerConnection = createRemoteConnection();
+        ConnectionInfo consumerConnectionInfo = createConnectionInfo();
+        consumerSessionInfo = createSessionInfo(consumerConnectionInfo);
+        consumerConnection.send(consumerConnectionInfo);
+        consumerConnection.send(consumerSessionInfo);
+    }
+
+    protected void tearDown() throws Exception {
+        bridge.stop();
+        super.tearDown();
+    }
+
+    public static Test suite() {
+        return suite(DemandForwardingBridgeFilterTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}
\ No newline at end of file

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DemandForwardingBridgeFilterTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to