Author: kwall
Date: Tue Jun 17 21:37:29 2014
New Revision: 1603294

URL: http://svn.apache.org/r1603294
Log:
QPID-5785: [Java Broker] Fix NPE when a topic exchange is closed after a 
message filter has been added to an existing binding.

This corrects a regression introduced by QPID-5709 that was causing a NPE to 
appear in the test log for QueueBindingTest.

Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1603294&r1=1603293&r2=1603294&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
 Tue Jun 17 21:37:29 2014
@@ -76,8 +76,10 @@ public class TopicExchange extends Abstr
         assert queue != null;
         assert bindingKey != null;
 
-        _logger.debug("Updating binding of queue " + queue.getName() + " with 
routing key " + bindingKey);
-
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Updating binding of queue " + queue.getName() + " 
with routing key " + bindingKey);
+        }
 
         String routingKey = TopicNormalizer.normalize(bindingKey);
 
@@ -87,6 +89,7 @@ public class TopicExchange extends Abstr
             if (_bindings.containsKey(binding))
             {
                 Map<String, Object> oldArgs = _bindings.get(binding);
+                _bindings.put(binding, args);
                 TopicExchangeResult result = 
_topicExchangeResults.get(routingKey);
 
                 if (FilterSupport.argumentsContainFilter(args))
@@ -136,8 +139,10 @@ public class TopicExchange extends Abstr
         assert queue != null;
         assert bindingKey != null;
 
-        _logger.debug("Registering queue " + queue.getName() + " with routing 
key " + bindingKey);
-
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Registering queue " + queue.getName() + " with 
routing key " + bindingKey);
+        }
 
         String routingKey = TopicNormalizer.normalize(bindingKey);
 
@@ -252,6 +257,12 @@ public class TopicExchange extends Abstr
         if(_bindings.containsKey(binding))
         {
             Map<String,Object> bindingArgs = _bindings.remove(binding);
+
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("deregisterQueue " + bindingArgs);
+            }
+
             String bindingKey = 
TopicNormalizer.normalize(binding.getBindingKey());
             TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
 

Modified: 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java?rev=1603294&r1=1603293&r2=1603294&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
 Tue Jun 17 21:37:29 2014
@@ -20,9 +20,11 @@
  */
 package org.apache.qpid.server.exchange;
 
+import static org.apache.qpid.common.AMQPFilterTypes.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +33,7 @@ import java.util.UUID;
 import org.junit.Assert;
 
 import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
@@ -64,6 +67,7 @@ public class TopicExchangeTest extends Q
         attributes.put(Exchange.DURABLE, false);
 
         _exchange = new TopicExchange(attributes, _vhost);
+        _exchange.open();
     }
 
     @Override
@@ -94,7 +98,7 @@ public class TopicExchangeTest extends Q
     public void testNoRoute() throws Exception
     {
         AMQQueue<?> queue = createQueue("a*#b");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b", 
queue, _exchange, null));
+        createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null);
 
 
         routeMessage("a.b", 0l);
@@ -105,7 +109,7 @@ public class TopicExchangeTest extends Q
     public void testDirectMatch() throws Exception
     {
         AMQQueue<?> queue = createQueue("ab");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, 
_exchange, null));
+        createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null);
 
 
         routeMessage("a.b",0l);
@@ -127,7 +131,7 @@ public class TopicExchangeTest extends Q
     public void testStarMatch() throws Exception
     {
         AMQQueue<?> queue = createQueue("a*");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*", queue, 
_exchange, null));
+        createBinding(UUID.randomUUID(), "a.*", queue, _exchange, null);
 
 
         routeMessage("a.b",0l);
@@ -158,7 +162,7 @@ public class TopicExchangeTest extends Q
     public void testHashMatch() throws Exception
     {
         AMQQueue<?> queue = createQueue("a#");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.#", queue, 
_exchange, null));
+        createBinding(UUID.randomUUID(), "a.#", queue, _exchange, null);
 
 
         routeMessage("a.b.c",0l);
@@ -209,7 +213,7 @@ public class TopicExchangeTest extends Q
     public void testMidHash() throws Exception
     {
         AMQQueue<?> queue = createQueue("a");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b", 
queue, _exchange, null));
+        createBinding(UUID.randomUUID(), "a.*.#.b", queue, _exchange, null);
 
         routeMessage("a.c.d.b",0l);
 
@@ -234,7 +238,7 @@ public class TopicExchangeTest extends Q
     public void testMatchAfterHash() throws Exception
     {
         AMQQueue<?> queue = createQueue("a#");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.*.#.b.c", 
queue, _exchange, null));
+        createBinding(UUID.randomUUID(), "a.*.#.b.c", queue, _exchange, null);
 
 
         int queueCount = routeMessage("a.c.b.b",0l);
@@ -272,11 +276,11 @@ public class TopicExchangeTest extends Q
     public void testHashAfterHash() throws Exception
     {
         AMQQueue<?> queue = createQueue("a#");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(),
-                                                              "a.*.#.b.c.#.d",
-                                                              queue,
-                                                              _exchange,
-                                                              null));
+        createBinding(UUID.randomUUID(),
+                      "a.*.#.b.c.#.d",
+                      queue,
+                      _exchange,
+                      null);
 
         int queueCount = routeMessage("a.c.b.b.c",0l);
         Assert.assertEquals("Message should not route to any queues", 0, 
queueCount);
@@ -297,7 +301,7 @@ public class TopicExchangeTest extends Q
     public void testHashHash() throws Exception
     {
         AMQQueue<?> queue = createQueue("a#");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.#.*.#.d", 
queue, _exchange, null));
+        createBinding(UUID.randomUUID(), "a.#.*.#.d", queue, _exchange, null);
 
         int queueCount = routeMessage("a.c.b.b.c",0l);
         Assert.assertEquals("Message should not route to any queues", 0, 
queueCount);
@@ -318,7 +322,7 @@ public class TopicExchangeTest extends Q
     public void testSubMatchFails() throws Exception
     {
         AMQQueue<?> queue = createQueue("a");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b.c.d", 
queue, _exchange, null));
+        createBinding(UUID.randomUUID(), "a.b.c.d", queue, _exchange, null);
 
         int queueCount = routeMessage("a.b.c",0l);
         Assert.assertEquals("Message should not route to any queues", 0, 
queueCount);
@@ -327,27 +331,10 @@ public class TopicExchangeTest extends Q
 
     }
 
-    private int routeMessage(String routingKey, long messageNumber)
-    {
-        ServerMessage message = mock(ServerMessage.class);
-        when(message.getInitialRoutingAddress()).thenReturn(routingKey);
-        List<? extends BaseQueue> queues = _exchange.route(message, 
routingKey, InstanceProperties.EMPTY);
-        MessageReference ref = mock(MessageReference.class);
-        when(ref.getMessage()).thenReturn(message);
-        when(message.newReference()).thenReturn(ref);
-        when(message.getMessageNumber()).thenReturn(messageNumber);
-        for(BaseQueue q : queues)
-        {
-            q.enqueue(message, null);
-        }
-
-        return queues.size();
-    }
-
     public void testMoreRouting() throws Exception
     {
         AMQQueue<?> queue = createQueue("a");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, 
_exchange, null));
+       createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null);
 
 
         int queueCount = routeMessage("a.b.c",0l);
@@ -360,7 +347,7 @@ public class TopicExchangeTest extends Q
     public void testMoreQueue() throws Exception
     {
         AMQQueue<?> queue = createQueue("a");
-        _exchange.registerQueue(createBinding(UUID.randomUUID(), "a.b", queue, 
_exchange, null));
+        createBinding(UUID.randomUUID(), "a.b", queue, _exchange, null);
 
 
         int queueCount = routeMessage("a",0l);
@@ -370,11 +357,119 @@ public class TopicExchangeTest extends Q
 
     }
 
-    private static BindingImpl createBinding(UUID id,
-                                                final String bindingKey,
-                                                final AMQQueue queue,
-                                                final ExchangeImpl exchange,
-                                                final Map<String, Object> 
arguments)
+    public void testRouteWithJMSSelector() throws Exception
+    {
+        AMQQueue<?> queue = createQueue("queue1");
+        final String bindingKey = "bindingKey";
+
+        Map<String, Object> bindArgs = Collections.<String, 
Object>singletonMap(JMS_SELECTOR.toString(), "arg > 5");
+        createBinding(UUID.randomUUID(), bindingKey, queue, _exchange, 
bindArgs);
+
+        ServerMessage matchMsg1 = mock(ServerMessage.class);
+        AMQMessageHeader msgHeader1 = createMessageHeader(Collections.<String, 
Object>singletonMap("arg", 6));
+        when(matchMsg1.getMessageHeader()).thenReturn(msgHeader1);
+        routeMessage(matchMsg1, bindingKey, 1);
+        Assert.assertEquals("First message should be routed to queue", 1, 
queue.getQueueDepthMessages());
+
+        ServerMessage nonmatchMsg2 = mock(ServerMessage.class);
+        AMQMessageHeader msgHeader2 = createMessageHeader(Collections.<String, 
Object>singletonMap("arg", 5));
+        when(nonmatchMsg2.getMessageHeader()).thenReturn(msgHeader2);
+        routeMessage(nonmatchMsg2, bindingKey, 2);
+        Assert.assertEquals("Second message should not be routed to queue", 1, 
queue.getQueueDepthMessages());
+
+        ServerMessage nonmatchMsg3 = mock(ServerMessage.class);
+        AMQMessageHeader msgHeader3 = createMessageHeader(Collections.<String, 
Object>emptyMap());
+        when(nonmatchMsg3.getMessageHeader()).thenReturn(msgHeader3);
+        routeMessage(nonmatchMsg3, bindingKey, 3);
+        Assert.assertEquals("Third message should not be routed to queue", 1, 
queue.getQueueDepthMessages());
+
+        ServerMessage matchMsg4 = mock(ServerMessage.class);
+        AMQMessageHeader msgHeader4 = createMessageHeader(Collections.<String, 
Object>singletonMap("arg", 7));
+        when(matchMsg4.getMessageHeader()).thenReturn(msgHeader4);
+        routeMessage(matchMsg4, bindingKey, 4);
+        Assert.assertEquals("First message should be routed to queue", 2, 
queue.getQueueDepthMessages());
+
+    }
+
+    public void testUpdateBindingReplacingSelector() throws Exception
+    {
+        AMQQueue<?> queue = createQueue("queue1");
+        final String bindingKey = "a";
+
+        Map<String, Object> originalArgs = Collections.<String, 
Object>singletonMap(JMS_SELECTOR.toString(), "arg > 5");
+        createBinding(UUID.randomUUID(), bindingKey, queue, _exchange, 
originalArgs);
+
+        AMQMessageHeader mgsHeader1 = createMessageHeader(Collections.<String, 
Object>singletonMap("arg", 6));
+        ServerMessage msg1 = mock(ServerMessage.class);
+        when(msg1.getMessageHeader()).thenReturn(mgsHeader1);
+
+        routeMessage(msg1, bindingKey, 1);
+        Assert.assertEquals(1, queue.getQueueDepthMessages());
+
+        // Update the binding
+        Map<String, Object> newArgs = Collections.<String, 
Object>singletonMap(JMS_SELECTOR.toString(), "arg > 6");
+        _exchange.replaceBinding(bindingKey, queue, newArgs);
+
+        // Message that would have matched the original selector but not the 
new
+        AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, 
Object>singletonMap("arg", 6));
+        ServerMessage msg2 = mock(ServerMessage.class);
+        when(msg2.getMessageHeader()).thenReturn(mgsHeader2);
+
+        routeMessage(msg2, bindingKey, 2);
+        Assert.assertEquals(1, queue.getQueueDepthMessages());
+
+        // Message that matches only the second
+        AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, 
Object>singletonMap("arg", 7));
+        ServerMessage msg3 = mock(ServerMessage.class);
+        when(msg3.getMessageHeader()).thenReturn(mgsHeader3);
+
+        routeMessage(msg3, bindingKey, 2);
+        Assert.assertEquals(2, queue.getQueueDepthMessages());
+
+    }
+
+    // This demonstrates QPID-5785.  Deleting the exchange after this 
combination of binding
+    // updates generated a NPE
+    public void testUpdateBindingAddingSelector() throws Exception
+    {
+        AMQQueue<?> queue = createQueue("queue1");
+        final String bindingKey = "a";
+
+        BindingImpl binding = createBinding(UUID.randomUUID(), bindingKey, 
queue, _exchange, null);
+
+        ServerMessage msg1 = mock(ServerMessage.class);
+
+        routeMessage(msg1, bindingKey, 1);
+        Assert.assertEquals(1, queue.getQueueDepthMessages());
+
+        // Update the binding adding selector
+        Map<String, Object> newArgs = Collections.<String, 
Object>singletonMap(JMS_SELECTOR.toString(), "arg > 6");
+        _exchange.replaceBinding(bindingKey, queue, newArgs);
+
+        // Message that does not match the new selector
+        AMQMessageHeader mgsHeader2 = createMessageHeader(Collections.<String, 
Object>singletonMap("arg", 6));
+        ServerMessage msg2 = mock(ServerMessage.class);
+        when(msg2.getMessageHeader()).thenReturn(mgsHeader2);
+
+        routeMessage(msg2, bindingKey, 2);
+        Assert.assertEquals(1, queue.getQueueDepthMessages());
+
+        // Message that matches the selector
+        AMQMessageHeader mgsHeader3 = createMessageHeader(Collections.<String, 
Object>singletonMap("arg", 7));
+        ServerMessage msg3 = mock(ServerMessage.class);
+        when(msg3.getMessageHeader()).thenReturn(mgsHeader3);
+
+        routeMessage(msg3, bindingKey, 2);
+        Assert.assertEquals(2, queue.getQueueDepthMessages());
+
+        _exchange.delete();
+    }
+
+    private BindingImpl createBinding(UUID id,
+                                      String bindingKey,
+                                      AMQQueue queue,
+                                      ExchangeImpl exchange,
+                                      Map<String, Object> arguments)
     {
         Map<String, Object> attributes = new HashMap<String, Object>();
         attributes.put(Binding.NAME, bindingKey);
@@ -383,10 +478,47 @@ public class TopicExchangeTest extends Q
             attributes.put(Binding.ARGUMENTS, arguments);
         }
         attributes.put(Binding.ID, id);
-        BindingImpl binding = new BindingImpl(attributes, queue, exchange);
+
+        BindingImpl binding = (BindingImpl) 
_vhost.getObjectFactory().create(Binding.class, attributes, queue, exchange);
         binding.open();
         return binding;
     }
 
+    private int routeMessage(String routingKey, long messageNumber)
+    {
+        ServerMessage message = mock(ServerMessage.class);
+        return routeMessage(message, routingKey, messageNumber);
+    }
+
+    private int routeMessage(ServerMessage message, String routingKey, long 
messageNumber)
+    {
+        when(message.getInitialRoutingAddress()).thenReturn(routingKey);
+        List<? extends BaseQueue> queues = _exchange.route(message, 
routingKey, InstanceProperties.EMPTY);
+        MessageReference ref = mock(MessageReference.class);
+        when(ref.getMessage()).thenReturn(message);
+        when(message.newReference()).thenReturn(ref);
+        when(message.getMessageNumber()).thenReturn(messageNumber);
+        for(BaseQueue q : queues)
+        {
+            q.enqueue(message, null);
+        }
+
+        return queues.size();
+    }
+
+    private AMQMessageHeader createMessageHeader(Map<String, Object> headers)
+    {
+        AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
+        for(Map.Entry<String, Object> entry : headers.entrySet())
+        {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+
+            when(messageHeader.containsHeader(key)).thenReturn(true);
+            when(messageHeader.getHeader(key)).thenReturn(value);
+        }
+        return messageHeader;
+    }
+
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to