Author: rgodfrey
Date: Fri Aug 16 10:22:07 2013
New Revision: 1514639

URL: http://svn.apache.org/r1514639
Log:
QPID-5073 : Add dependency on alternate exchange for queues where such an 
alternate is set

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java?rev=1514639&r1=1514638&r2=1514639&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
 Fri Aug 16 10:22:07 2013
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.model.Queue;
@@ -62,21 +64,48 @@ public class QueueRecoverer extends Abst
 
     private class UnresolvedQueue implements UnresolvedObject<AMQQueue>
     {
+        private final Map<String, Object> _attributes;
+        private final UUID _alternateExchangeId;
+        private final UUID _id;
         private AMQQueue _queue;
+        private List<UnresolvedDependency> _dependencies = new 
ArrayList<UnresolvedDependency>();
+        private Exchange _alternateExchange;
 
         public UnresolvedQueue(final UUID id,
                                final String type,
-                               final Map<String, Object> attributeMap)
+                               final Map<String, Object> attributes)
         {
-            String queueName = (String) attributeMap.get(Queue.NAME);
-            String owner = (String) attributeMap.get(Queue.OWNER);
-            boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
-            UUID alternateExchangeId = 
attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : 
UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
+            _attributes = attributes;
+            _alternateExchangeId = _attributes.get(Queue.ALTERNATE_EXCHANGE) 
== null ? null : UUID.fromString((String) _attributes
+                    .get(Queue.ALTERNATE_EXCHANGE));
+            _id = id;
+            if (_alternateExchangeId != null)
+            {
+                _alternateExchange = 
_exchangeRegistry.getExchange(_alternateExchangeId);
+                if(_alternateExchange == null)
+                {
+                    _dependencies.add(new AlternateExchangeDependency());
+                }
+            }
+        }
+
+        @Override
+        public UnresolvedDependency[] getUnresolvedDependencies()
+        {
+            return _dependencies.toArray(new 
UnresolvedDependency[_dependencies.size()]);
+        }
+
+        @Override
+        public AMQQueue resolve()
+        {
+            String queueName = (String) _attributes.get(Queue.NAME);
+            String owner = (String) _attributes.get(Queue.OWNER);
+            boolean exclusive = (Boolean) _attributes.get(Queue.EXCLUSIVE);
             @SuppressWarnings("unchecked")
-            Map<String, Object> queueArgumentsMap = (Map<String, Object>) 
attributeMap.get(Queue.ARGUMENTS);
+            Map<String, Object> queueArgumentsMap = (Map<String, Object>) 
_attributes.get(Queue.ARGUMENTS);
             try
             {
-                _queue = _virtualHost.getQueueRegistry().getQueue(id);
+                _queue = _virtualHost.getQueueRegistry().getQueue(_id);
                 if(_queue == null)
                 {
                     _queue = 
_virtualHost.getQueueRegistry().getQueue(queueName);
@@ -84,38 +113,43 @@ public class QueueRecoverer extends Abst
 
                 if (_queue == null)
                 {
-                    _queue = AMQQueueFactory.createAMQQueueImpl(id, queueName, 
true, owner, false, exclusive, _virtualHost,
-                                                           queueArgumentsMap);
+                    _queue = AMQQueueFactory.createAMQQueueImpl(_id, 
queueName, true, owner, false, exclusive, _virtualHost,
+                                                                
queueArgumentsMap);
                     _virtualHost.getQueueRegistry().registerQueue(_queue);
 
-                    if (alternateExchangeId != null)
+                    if (_alternateExchange != null)
                     {
-                        Exchange altExchange = 
_exchangeRegistry.getExchange(alternateExchangeId);
-                        if (altExchange == null)
-                        {
-                            _logger.error("Unknown exchange id " + 
alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
-                            return;
-                        }
-                        _queue.setAlternateExchange(altExchange);
+                        _queue.setAlternateExchange(_alternateExchange);
                     }
                 }
             }
             catch (AMQException e)
             {
-                throw new RuntimeException("Error recovering queue uuid " + id 
+ " name " + queueName, e);
+                throw new RuntimeException("Error recovering queue uuid " + 
_id + " name " + queueName, e);
             }
+            return _queue;
         }
 
-        @Override
-        public UnresolvedDependency[] getUnresolvedDependencies()
+        private class AlternateExchangeDependency implements 
UnresolvedDependency
         {
-            return new UnresolvedDependency[0];
-        }
+            @Override
+            public UUID getId()
+            {
+                return _alternateExchangeId;
+            }
 
-        @Override
-        public AMQQueue resolve()
-        {
-            return _queue;
+            @Override
+            public String getType()
+            {
+                return "Exchange";
+            }
+
+            @Override
+            public void resolve(final Object dependency)
+            {
+                _alternateExchange = (Exchange) dependency;
+                _dependencies.remove(this);
+            }
         }
     }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java?rev=1514639&r1=1514638&r2=1514639&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
 Fri Aug 16 10:22:07 2013
@@ -28,7 +28,10 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -36,21 +39,27 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.HeadersExchange;
 import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.RootMessageLogger;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.security.*;
+import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationRecoverer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
 import org.apache.qpid.test.utils.QpidTestCase;
+import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
@@ -73,6 +82,7 @@ public class DurableConfigurationRecover
     private DurableConfigurationStore _store;
     private ExchangeFactory _exchangeFactory;
     private ExchangeRegistry _exchangeRegistry;
+    private QueueRegistry _queueRegistry;
 
     @Override
     public void setUp() throws Exception
@@ -95,10 +105,57 @@ public class DurableConfigurationRecover
         
when(_exchangeRegistry.getExchange(eq(DIRECT_EXCHANGE_ID))).thenReturn(_directExchange);
         
when(_exchangeRegistry.getExchange(eq(TOPIC_EXCHANGE_ID))).thenReturn(_topicExchange);
 
-        QueueRegistry queueRegistry = mock(QueueRegistry.class);
-        when(_vhost.getQueueRegistry()).thenReturn(queueRegistry);
 
-        when(queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+        final ArgumentCaptor<Exchange> registeredExchange = 
ArgumentCaptor.forClass(Exchange.class);
+        doAnswer(new Answer()
+        {
+
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable
+            {
+                Exchange exchange = registeredExchange.getValue();
+                
when(_exchangeRegistry.getExchange(exchange.getId())).thenReturn(exchange);
+                
when(_exchangeRegistry.getExchange(exchange.getName())).thenReturn(exchange);
+                return null;
+            }
+        
}).when(_exchangeRegistry).registerExchange(registeredExchange.capture());
+
+
+
+        _queueRegistry = mock(QueueRegistry.class);
+        when(_vhost.getQueueRegistry()).thenReturn(_queueRegistry);
+
+        when(_queueRegistry.getQueue(eq(QUEUE_ID))).thenReturn(queue);
+
+        final ArgumentCaptor<AMQQueue> registeredQueue = 
ArgumentCaptor.forClass(AMQQueue.class);
+        doAnswer(new Answer()
+        {
+
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable
+            {
+                AMQQueue queue = registeredQueue.getValue();
+                when(_queueRegistry.getQueue(queue.getId())).thenReturn(queue);
+                
when(_queueRegistry.getQueue(queue.getName())).thenReturn(queue);
+                return null;
+            }
+        }).when(_queueRegistry).registerQueue(registeredQueue.capture());
+
+        /* These lines necessary to get queue creation to work because 
AMQQueueFactory is called directly rather than
+           queue creation being on vhost - yuck! */
+        SecurityManager securityManager = mock(SecurityManager.class);
+        when(_vhost.getSecurityManager()).thenReturn(securityManager);
+        
when(securityManager.authoriseCreateQueue(anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),anyBoolean(),
+                                                  
any(AMQShortString.class),anyString())).thenReturn(true);
+        VirtualHostConfiguration configuration = 
mock(VirtualHostConfiguration.class);
+        when(_vhost.getConfiguration()).thenReturn(configuration);
+        QueueConfiguration queueConfiguration = mock(QueueConfiguration.class);
+        
when(configuration.getQueueConfiguration(anyString())).thenReturn(queueConfiguration);
+        LogActor logActor = mock(LogActor.class);
+        CurrentActor.set(logActor);
+        RootMessageLogger rootLogger = mock(RootMessageLogger.class);
+        when(logActor.getRootMessageLogger()).thenReturn(rootLogger);
+        /* end of queue creation mock hackery */
 
         _exchangeFactory = mock(ExchangeFactory.class);
 
@@ -208,15 +265,6 @@ public class DurableConfigurationRecover
                                              
eq(HeadersExchange.TYPE.getType()),
                                              anyBoolean(),
                                              
anyBoolean())).thenReturn(customExchange);
-        doAnswer(new Answer()
-        {
-            @Override
-            public Object answer(final InvocationOnMock invocation) throws 
Throwable
-            {
-                
when(_exchangeRegistry.getExchange(eq(customExchangeId))).thenReturn(customExchange);
-                return null;
-            }
-        }).when(_exchangeRegistry).registerExchange(customExchange);
 
         final ConfiguredObjectRecord[] expected = {
                 new ConfiguredObjectRecord(new UUID(1, 0), 
"org.apache.qpid.server.model.Binding",
@@ -318,6 +366,35 @@ public class DurableConfigurationRecover
 
     }
 
+    public void testRecoveryOfQueueAlternateExchange() throws Exception
+    {
+
+        final UUID queueId = new UUID(1, 0);
+        final UUID exchangeId = new UUID(2, 0);
+
+
+
+        final Exchange customExchange = mock(Exchange.class);
+
+        when(_exchangeFactory.createExchange(eq(exchangeId),
+                                             eq(CUSTOM_EXCHANGE_NAME),
+                                             
eq(HeadersExchange.TYPE.getType()),
+                                             anyBoolean(),
+                                             
anyBoolean())).thenReturn(customExchange);
+
+        _durableConfigurationRecoverer.beginConfigurationRecovery(_store, 2);
+
+        _durableConfigurationRecoverer.configuredObject(queueId, 
Queue.class.getSimpleName(),
+                                                        
createQueue("testQueue", exchangeId));
+        _durableConfigurationRecoverer.configuredObject(exchangeId,
+                                                        
org.apache.qpid.server.model.Exchange.class.getSimpleName(),
+                                                        
createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE));
+
+        _durableConfigurationRecoverer.completeConfigurationRecovery();
+
+        assertEquals(_queueRegistry.getQueue(queueId).getAlternateExchange(), 
customExchange);
+    }
+
     private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) 
throws AMQStoreException
     {
         doAnswer(new Answer()
@@ -373,4 +450,21 @@ public class DurableConfigurationRecover
         return exchange;
 
     }
+
+
+    private Map<String, Object> createQueue(String name, UUID 
alternateExchangeId)
+    {
+        Map<String, Object> queue = new LinkedHashMap<String, Object>();
+
+        queue.put(Queue.NAME, name);
+        if(alternateExchangeId != null)
+        {
+            queue.put(Queue.ALTERNATE_EXCHANGE, 
alternateExchangeId.toString());
+        }
+        queue.put(Queue.EXCLUSIVE, false);
+
+        return queue;
+
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to