Author: rgodfrey
Date: Fri Apr 18 16:03:11 2014
New Revision: 1588501

URL: http://svn.apache.org/r1588501
Log:
QPID-5710 : [Java Broker] Use common creation/recovery mechanism for Bindings

Added:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
    
qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
    
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java

Added: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1588501&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
 Fri Apr 18 16:03:11 2014
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.binding;
+
+import java.util.Map;
+
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class BindingFactory extends 
AbstractConfiguredObjectTypeFactory<BindingImpl>
+{
+    public BindingFactory()
+    {
+        super(BindingImpl.class);
+    }
+
+    @Override
+    protected BindingImpl createInstance(final Map<String, Object> attributes, 
final ConfiguredObject<?>... parents)
+    {
+        ExchangeImpl<?> exchange = (ExchangeImpl<?>) getParent(Exchange.class, 
parents);
+        AMQQueue<?> queue = (AMQQueue<?>) getParent(Queue.class, parents);
+        BindingImpl binding = new BindingImpl(attributes, queue, exchange);
+        exchange.addBinding(binding);
+        return binding;
+    }
+}

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1588501&r1=1588500&r2=1588501&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
 Fri Apr 18 16:03:11 2014
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Queu
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
@@ -81,7 +82,11 @@ public class BindingImpl
 
     public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue 
queue, ExchangeImpl exchange)
     {
-        
super(parentsMap(queue,exchange),enhanceWithDurable(combineIdWithAttributes(id, 
attributes), queue, exchange),queue.getVirtualHost().getTaskExecutor());
+        this(enhanceWithDurable(combineIdWithAttributes(id,attributes), queue, 
exchange), queue, exchange);
+    }
+    public BindingImpl(Map<String, Object> attributes, AMQQueue queue, 
ExchangeImpl exchange)
+    {
+        
super(parentsMap(queue,exchange),attributes,queue.getVirtualHost().getTaskExecutor());
         _bindingKey = 
(String)attributes.get(org.apache.qpid.server.model.Binding.NAME);
         _queue = queue;
         _exchange = exchange;
@@ -99,6 +104,17 @@ public class BindingImpl
 
     }
 
+    @Override
+    protected void onCreate()
+    {
+        super.onCreate();
+        if (isDurable())
+        {
+            
DurableConfigurationStoreHelper.createBinding(_queue.getVirtualHost().getDurableConfigurationStore(),
 this);
+        }
+
+    }
+
     private static Map<String, Object> enhanceWithDurable(Map<String, Object> 
attributes,
                                                           final AMQQueue queue,
                                                           final ExchangeImpl 
exchange)

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1588501&r1=1588500&r2=1588501&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 Fri Apr 18 16:03:11 2014
@@ -626,12 +626,6 @@ public abstract class AbstractExchange<T
                            true);
     }
 
-    @Override
-    public void restoreBinding(final UUID id, final String bindingKey, final 
AMQQueue queue,
-                               final Map<String, Object> argumentMap)
-    {
-        makeBinding(id, bindingKey,queue, argumentMap,true, false);
-    }
 
     private void removeBinding(final BindingImpl binding)
     {
@@ -713,18 +707,10 @@ public abstract class AbstractExchange<T
             if (existingMapping == null)
             {
                 BindingImpl b = new BindingImpl(id, attributes, queue, this);
-                b.addStateChangeListener(_bindingListener);
-                b.open();
+                b.create();
 
-                if (b.isDurable() && !restore)
-                {
-                    
DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(),
 b);
-                }
-                _bindingsMap.put(bindingIdentifier, b);
-                queue.addBinding(b);
-                childAdded(b);
+                addBinding(b);
 
-                doAddBinding(b);
 
                 return true;
             }
@@ -742,6 +728,20 @@ public abstract class AbstractExchange<T
         }
     }
 
+    @Override
+    public void addBinding(final BindingImpl b)
+    {
+        b.addStateChangeListener(_bindingListener);
+
+        BindingIdentifier identifier = new BindingIdentifier(b.getName(), 
b.getAMQQueue());
+
+        _bindingsMap.put(identifier, b);
+        b.getAMQQueue().addBinding(b);
+        childAdded(b);
+
+        doAddBinding(b);
+    }
+
     protected abstract void onBindingUpdated(final BindingImpl binding,
                                              final Map<String, Object> 
oldArguments);
 

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java?rev=1588501&r1=1588500&r2=1588501&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
 Fri Apr 18 16:03:11 2014
@@ -59,9 +59,6 @@ public interface ExchangeImpl<T extends 
                            AMQQueue queue,
                            Map<String, Object> arguments);
 
-    void restoreBinding(UUID id, String bindingKey, AMQQueue queue,
-                        Map<String, Object> argumentMap);
-
     void delete();
 
     /**
@@ -114,6 +111,8 @@ public interface ExchangeImpl<T extends 
 
     EventLogger getEventLogger();
 
+    void addBinding(BindingImpl binding);
+
 
     public interface BindingListener
     {

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java?rev=1588501&r1=1588500&r2=1588501&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
 Fri Apr 18 16:03:11 2014
@@ -63,6 +63,10 @@ public class ConfiguredObjectFactoryImpl
                 {
                     _defaultTypes.put(categoryName, annotation.defaultType());
                 }
+                else
+                {
+                    _defaultTypes.put(categoryName, categoryName);
+                }
 
             }
             if(categoryFactories.put(factory.getType(),factory) != null)

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java?rev=1588501&r1=1588500&r2=1588501&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
 Fri Apr 18 16:03:11 2014
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.virtualhost;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -29,11 +30,16 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.UnresolvedConfiguredObject;
 import org.apache.qpid.server.store.UnresolvedDependency;
 import org.apache.qpid.server.store.UnresolvedObject;
 
@@ -41,11 +47,14 @@ public class BindingRecoverer extends Ab
 {
     private static final Logger _logger = 
Logger.getLogger(BindingRecoverer.class);
 
-    private final VirtualHostImpl _virtualHost;
+    private final VirtualHostImpl<?,?,?> _virtualHost;
+    private final ConfiguredObjectFactory _objectFactory;
 
-    public BindingRecoverer(final VirtualHostImpl virtualHost)
+    public BindingRecoverer(final VirtualHostImpl<?,?,?> virtualHost)
     {
         _virtualHost = virtualHost;
+        Broker<?> broker = _virtualHost.getParent(Broker.class);
+        _objectFactory = broker.getObjectFactory();
     }
 
     @Override
@@ -67,6 +76,7 @@ public class BindingRecoverer extends Ab
         private final UUID _queueId;
         private final UUID _exchangeId;
         private final UUID _bindingId;
+        private final ConfiguredObjectRecord _record;
 
         private List<UnresolvedDependency> _unresolvedDependencies =
                 new ArrayList<UnresolvedDependency>();
@@ -76,6 +86,7 @@ public class BindingRecoverer extends Ab
 
         public UnresolvedBinding(final ConfiguredObjectRecord record)
         {
+            _record = record;
             _bindingId = record.getId();
             _exchangeId = 
record.getParents().get(Exchange.class.getSimpleName()).getId();
             _queueId = 
record.getParents().get(Queue.class.getSimpleName()).getId();
@@ -90,6 +101,7 @@ public class BindingRecoverer extends Ab
                 _unresolvedDependencies.add(new QueueDependency());
             }
 
+
             _bindingName = (String) 
record.getAttributes().get(org.apache.qpid.server.model.Binding.NAME);
             _bindingArgumentsMap = (Map<String, Object>) 
record.getAttributes().get(org.apache.qpid.server.model.Binding.ARGUMENTS);
         }
@@ -108,7 +120,17 @@ public class BindingRecoverer extends Ab
                 _logger.info("Restoring binding: (Exchange: " + 
_exchange.getName() + ", Queue: " + _queue.getName()
                              + ", Routing Key: " + _bindingName + ", 
Arguments: " + _bindingArgumentsMap + ")");
 
-                _exchange.restoreBinding(_bindingId, _bindingName, _queue, 
_bindingArgumentsMap);
+
+                Map<String,Object> attributesWithId = new 
HashMap<String,Object>(_record.getAttributes());
+                
attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,_record.getId());
+                
attributesWithId.put(org.apache.qpid.server.model.Exchange.DURABLE,true);
+
+                ConfiguredObjectTypeFactory<? extends Binding> 
configuredObjectTypeFactory =
+                        
_objectFactory.getConfiguredObjectTypeFactory(Binding.class, attributesWithId);
+                UnresolvedConfiguredObject<? extends Binding> 
unresolvedConfiguredObject =
+                        configuredObjectTypeFactory.recover(_record, 
_exchange, _queue);
+                Binding binding = (Binding<?>) 
unresolvedConfiguredObject.resolve();
+
             }
             return (_exchange).getBinding(_bindingName, _queue);
         }

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory?rev=1588501&r1=1588500&r2=1588501&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
 Fri Apr 18 16:03:11 2014
@@ -44,6 +44,7 @@ org.apache.qpid.server.exchange.DirectEx
 org.apache.qpid.server.exchange.FanoutExchangeFactory
 org.apache.qpid.server.exchange.HeadersExchangeFactory
 org.apache.qpid.server.exchange.TopicExchangeFactory
+org.apache.qpid.server.binding.BindingFactory
 
 
 

Modified: 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java?rev=1588501&r1=1588500&r2=1588501&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
 Fri Apr 18 16:03:11 2014
@@ -84,6 +84,7 @@ public class DurableConfigurationRecover
     private ConfiguredObjectFactory _configuredObjectFactory;
     private ConfiguredObjectTypeFactory _exchangeFactory;
     private ConfiguredObjectTypeFactory _queueFactory;
+    private ConfiguredObjectTypeFactory _bindingFactory;
 
     @Override
     public void setUp() throws Exception
@@ -92,6 +93,8 @@ public class DurableConfigurationRecover
         _configuredObjectFactory = mock(ConfiguredObjectFactory.class);
         _exchangeFactory = mock(ConfiguredObjectTypeFactory.class);
         _queueFactory = mock(ConfiguredObjectTypeFactory.class);
+        _bindingFactory = mock(ConfiguredObjectTypeFactory.class);
+
 
 
         AMQQueue<?> queue = mock(AMQQueue.class);
@@ -109,6 +112,8 @@ public class DurableConfigurationRecover
 
         
when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Exchange.class),
 anyMap())).thenReturn(_exchangeFactory);
         
when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Queue.class), 
anyMap())).thenReturn(_queueFactory);
+        
when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Binding.class), 
anyMap())).thenReturn(_bindingFactory);
+
 
 
         final ArgumentCaptor<ConfiguredObjectRecord> recoveredExchange = 
ArgumentCaptor.forClass(ConfiguredObjectRecord.class);
@@ -169,6 +174,33 @@ public class DurableConfigurationRecover
         }).when(_queueFactory).recover(recoveredQueue.capture(), 
any(ConfiguredObject.class));
 
 
+        final ArgumentCaptor<ConfiguredObjectRecord> recoveredBinding = 
ArgumentCaptor.forClass(ConfiguredObjectRecord.class);
+        final ArgumentCaptor<ConfiguredObject> parent1 = 
ArgumentCaptor.forClass(ConfiguredObject.class);
+        final ArgumentCaptor<ConfiguredObject> parent2 = 
ArgumentCaptor.forClass(ConfiguredObject.class);
+
+        doAnswer(new Answer()
+        {
+
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable
+            {
+                ConfiguredObjectRecord queueRecord = 
recoveredBinding.getValue();
+                Binding binding = mock(Binding.class);
+                UUID id = queueRecord.getId();
+                String name = (String) queueRecord.getAttributes().get("name");
+                when(binding.getId()).thenReturn(id);
+                when(binding.getName()).thenReturn(name);
+
+                UnresolvedConfiguredObject unresolved = 
mock(UnresolvedConfiguredObject.class);
+                when(unresolved.resolve()).thenReturn(binding);
+
+
+                return unresolved;
+            }
+        }).when(_bindingFactory).recover(recoveredBinding.capture(), 
parent1.capture(), parent2.capture());
+
+
+
         DurableConfiguredObjectRecoverer[] recoverers = {
                 new QueueRecoverer(_vhost),
                 new ExchangeRecoverer(_vhost),



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to