Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
 Mon Mar  9 17:12:14 2015
@@ -32,6 +32,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -147,7 +150,8 @@ public class FileBasedGroupProviderImpl
             GroupAdapter groupAdapter = new GroupAdapter(attrMap);
             principals.add(groupAdapter);
             groupAdapter.registerWithParents();
-            groupAdapter.open();
+            // TODO - we know this is safe, but the sync method shouldn't 
really be called from the management thread
+            groupAdapter.openAsync();
         }
 
     }
@@ -265,7 +269,7 @@ public class FileBasedGroupProviderImpl
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, 
State.ERRORED }, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         if (_groupDatabase != null)
         {
@@ -282,29 +286,48 @@ public class FileBasedGroupProviderImpl
                 throw new IllegalConfigurationException(String.format("Cannot 
load groups from '%s'", getPath()));
             }
         }
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, 
State.ERRORED}, desiredState = State.DELETED )
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
-        close();
-        File file = new File(getPath());
-        if (file.exists())
-        {
-            if (!file.delete())
-            {
-                throw new IllegalConfigurationException("Cannot delete group 
file");
-            }
-        }
-
-        deleted();
-        setState(State.DELETED);
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        closeAsync().addListener(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            File file = new File(getPath());
+                            if (file.exists())
+                            {
+                                if (!file.delete())
+                                {
+                                    throw new 
IllegalConfigurationException("Cannot delete group file");
+                                }
+                            }
+
+                            deleted();
+                            setState(State.DELETED);
+                        }
+                        finally
+                        {
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor()
+                           );
+        return returnVal;
     }
 
     @StateTransition( currentState = State.UNINITIALIZED, desiredState = 
State.QUIESCED)
-    private void startQuiesced()
+    private ListenableFuture<Void> startQuiesced()
     {
         setState(State.QUIESCED);
+        return Futures.immediateFuture(null);
     }
 
     public Set<Principal> getGroupPrincipalsForUser(String username)
@@ -377,9 +400,10 @@ public class FileBasedGroupProviderImpl
         }
 
         @StateTransition( currentState = State.UNINITIALIZED, desiredState = 
State.ACTIVE )
-        private void activate()
+        private ListenableFuture<Void> activate()
         {
             setState(State.ACTIVE);
+            return Futures.immediateFuture(null);
         }
 
         @Override
@@ -396,7 +420,8 @@ public class FileBasedGroupProviderImpl
                 attrMap.put(GroupMember.NAME, principal.getName());
                 GroupMemberAdapter groupMemberAdapter = new 
GroupMemberAdapter(attrMap);
                 groupMemberAdapter.registerWithParents();
-                groupMemberAdapter.open();
+                // todo - this will be safe, but the synchronous open should 
not be called from the management thread
+                groupMemberAdapter.openAsync();
                 members.add(groupMemberAdapter);
             }
             _groupPrincipal = new GroupPrincipal(getName());
@@ -459,12 +484,13 @@ public class FileBasedGroupProviderImpl
         }
 
         @StateTransition( currentState = State.ACTIVE, desiredState = 
State.DELETED )
-        private void doDelete()
+        private ListenableFuture<Void> doDelete()
         {
             getSecurityManager().authoriseGroupOperation(Operation.DELETE, 
getName());
             _groupDatabase.removeGroup(getName());
             deleted();
             setState(State.DELETED);
+            return Futures.immediateFuture(null);
         }
 
         @Override
@@ -522,19 +548,21 @@ public class FileBasedGroupProviderImpl
             }
 
             @StateTransition(currentState = State.UNINITIALIZED, desiredState 
= State.ACTIVE)
-            private void activate()
+            private ListenableFuture<Void> activate()
             {
                 setState(State.ACTIVE);
+                return Futures.immediateFuture(null);
             }
 
             @StateTransition(currentState = State.ACTIVE, desiredState = 
State.DELETED)
-            private void doDelete()
+            private ListenableFuture<Void> doDelete()
             {
                 getSecurityManager().authoriseGroupOperation(Operation.UPDATE, 
GroupAdapter.this.getName());
 
                 _groupDatabase.removeUserFromGroup(getName(), 
GroupAdapter.this.getName());
                 deleted();
                 setState(State.DELETED);
+                return Futures.immediateFuture(null);
             }
 
             @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
 Mon Mar  9 17:12:14 2015
@@ -37,16 +37,17 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.util.BaseAction;
-import org.apache.qpid.server.util.FileHelper;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonProcessingException;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 import org.codehaus.jackson.type.TypeReference;
 
+import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.AuthenticationProvider;
@@ -55,6 +56,8 @@ import org.apache.qpid.server.model.Mana
 import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.util.BaseAction;
+import org.apache.qpid.server.util.FileHelper;
 
 
 public class FileSystemPreferencesProviderImpl
@@ -128,7 +131,7 @@ public class FileSystemPreferencesProvid
     }
 
     @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, 
desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         if (_store != null)
         {
@@ -138,6 +141,7 @@ public class FileSystemPreferencesProvid
         {
             throw new IllegalStateException("Cannot open preferences provider 
" + getName() + " in state " + getState() );
         }
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -171,33 +175,52 @@ public class FileSystemPreferencesProvid
     }
 
     @StateTransition(currentState = { State.ACTIVE }, desiredState = 
State.QUIESCED)
-    private void doQuiesce()
+    private ListenableFuture<Void> doQuiesce()
     {
         if(_store != null)
         {
             _store.close();
         }
         setState(State.QUIESCED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, 
State.ERRORED }, desiredState = State.DELETED )
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
-        close();
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        closeAsync().addListener(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            if(_store != null)
+                            {
+                                _store.close();
+                                _store.delete();
+                                deleted();
+                                
_authenticationProvider.setPreferencesProvider(null);
+
+                            }
+                            setState(State.DELETED);
+                        }
+                        finally
+                        {
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor()
+                           );
 
-        if(_store != null)
-        {
-            _store.close();
-            _store.delete();
-            deleted();
-            _authenticationProvider.setPreferencesProvider(null);
+        return returnVal;
 
-        }
-        setState(State.DELETED);
     }
 
     @StateTransition(currentState = State.QUIESCED, desiredState = 
State.ACTIVE )
-    private void restart()
+    private ListenableFuture<Void> restart()
     {
         if (_store == null)
         {
@@ -206,6 +229,7 @@ public class FileSystemPreferencesProvid
 
         _store.open();
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
 Mon Mar  9 17:12:14 2015
@@ -26,6 +26,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
@@ -169,10 +172,11 @@ final class SessionAdapter extends Abstr
     }
 
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
 Mon Mar  9 17:12:14 2015
@@ -28,6 +28,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -228,14 +231,24 @@ abstract public class AbstractPort<X ext
     }
 
     @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, 
State.ERRORED}, desiredState = State.DELETED )
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
-        close();
-        setState(State.DELETED);
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        closeAsync().addListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                setState(State.DELETED);
+                returnVal.set(null);
+
+            }
+        }, getTaskExecutor().getExecutor());
+        return returnVal;
     }
 
     @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, 
State.ERRORED}, desiredState = State.ACTIVE )
-    protected void activate()
+    protected ListenableFuture<Void> activate()
     {
         try
         {
@@ -246,12 +259,14 @@ abstract public class AbstractPort<X ext
             setState(State.ERRORED);
             throw new IllegalConfigurationException("Unable to active port '" 
+ getName() + "'of type " + getType() + " on " + getPort(), e);
         }
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = State.UNINITIALIZED, desiredState = 
State.QUIESCED)
-    private void startQuiesced()
+    private ListenableFuture<Void> startQuiesced()
     {
         setState(State.QUIESCED);
+        return Futures.immediateFuture(null);
     }
 
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
 Mon Mar  9 17:12:14 2015
@@ -23,6 +23,8 @@ package org.apache.qpid.server.model.por
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -108,6 +110,14 @@ public class PortFactory<X extends Port<
     }
 
     @Override
+    public ListenableFuture<X> createAsync(final ConfiguredObjectFactory 
factory,
+                                           final Map<String, Object> 
attributes,
+                                           final ConfiguredObject<?>... 
parents)
+    {
+        return getPortFactory(factory, attributes, 
(Broker<?>)parents[0]).createAsync(factory, attributes,parents);
+    }
+
+    @Override
     public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory 
factory,
                                                  final ConfiguredObjectRecord 
record,
                                                  final ConfiguredObject<?>... 
parents)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
 Mon Mar  9 17:12:14 2015
@@ -20,19 +20,23 @@
  */
 package org.apache.qpid.server.plugin;
 
+import java.util.Map;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.UnresolvedConfiguredObject;
 
-import java.util.Map;
-
 public interface ConfiguredObjectTypeFactory<X extends ConfiguredObject<X>> 
extends Pluggable
 {
     Class<? super X> getCategoryClass();
 
     X create(final ConfiguredObjectFactory factory, Map<String, Object> 
attributes, ConfiguredObject<?>... parents);
 
+    ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, 
Map<String, Object> attributes, ConfiguredObject<?>... parents);
+
     UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory 
factory,
                                           ConfiguredObjectRecord record,
                                           ConfiguredObject<?>... parents);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Mon Mar  9 17:12:14 2015
@@ -43,12 +43,15 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.configuration.updater.TaskWithException;
 import org.apache.qpid.server.connection.SessionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -650,16 +653,51 @@ public abstract class AbstractQueue<X ex
 
 
     @Override
-    public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget 
target,
-                                     FilterManager filters,
+    public QueueConsumerImpl addConsumer(final ConsumerTarget target,
+                                     final FilterManager filters,
                                      final Class<? extends ServerMessage> 
messageClass,
                                      final String consumerName,
-                                     EnumSet<ConsumerImpl.Option> optionSet)
+                                     final EnumSet<ConsumerImpl.Option> 
optionSet)
             throws ExistingExclusiveConsumer, 
ExistingConsumerPreventsExclusive,
                    ConsumerAccessRefused
     {
 
+        try
+        {
+            return getTaskExecutor().run(new 
TaskWithException<QueueConsumerImpl, Exception>()
+            {
 
+                @Override
+                public QueueConsumerImpl execute()
+                        throws Exception
+                {
+
+                    return addConsumerInternal(target, filters, messageClass, 
consumerName, optionSet);
+                }
+            });
+        }
+        catch (ExistingExclusiveConsumer | ConsumerAccessRefused |
+            ExistingConsumerPreventsExclusive | RuntimeException e)
+        {
+            throw e;
+        }
+        catch (Exception e)
+        {
+            // Should never happen
+            throw new ServerScopedRuntimeException(e);
+        }
+
+
+    }
+
+    private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target,
+                                                  FilterManager filters,
+                                                  final Class<? extends 
ServerMessage> messageClass,
+                                                  final String consumerName,
+                                                  EnumSet<ConsumerImpl.Option> 
optionSet)
+            throws ExistingExclusiveConsumer, ConsumerAccessRefused,
+                   ExistingConsumerPreventsExclusive
+    {
         if (hasExclusiveConsumer())
         {
             throw new ExistingExclusiveConsumer();
@@ -771,7 +809,7 @@ public abstract class AbstractQueue<X ex
         QueueConsumerImpl consumer = new QueueConsumerImpl(this,
                                                            target,
                                                            consumerName,
-                                                           filters, 
+                                                           filters,
                                                            messageClass,
                                                            optionSet);
 
@@ -820,7 +858,6 @@ public abstract class AbstractQueue<X ex
         deliverAsync();
 
         return consumer;
-
     }
 
     @Override
@@ -832,7 +869,7 @@ public abstract class AbstractQueue<X ex
 
 
 
-    synchronized void unregisterConsumer(final QueueConsumerImpl consumer)
+    void unregisterConsumer(final QueueConsumerImpl consumer)
     {
         if (consumer == null)
         {
@@ -843,7 +880,7 @@ public abstract class AbstractQueue<X ex
 
         if (removed)
         {
-            consumer.close();
+            consumer.closeAsync();
             // No longer can the queue have an exclusive consumer
             setExclusiveSubscriber(null);
 
@@ -1802,7 +1839,15 @@ public abstract class AbstractQueue<X ex
 
             for (BindingImpl b : bindingCopy)
             {
-                b.delete();
+                // TODO - RG - Need to sort out bindings!
+                if(getTaskExecutor().isTaskExecutorThread())
+                {
+                    b.deleteAsync();
+                }
+                else
+                {
+                    b.delete();
+                }
             }
 
             QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = 
_consumerList.iterator();
@@ -1855,7 +1900,7 @@ public abstract class AbstractQueue<X ex
             }
 
             _deleteTaskList.clear();
-            close();
+            closeAsync();
             deleted();
             //Log Queue Deletion
             getEventLogger().message(_logSubject, QueueMessages.DELETED());
@@ -2661,7 +2706,7 @@ public abstract class AbstractQueue<X ex
         return allowed;
     }
 
-    private synchronized void updateExclusivityPolicy(ExclusivityPolicy 
desiredPolicy)
+    private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
             throws ExistingConsumerPreventsExclusive
     {
         if(desiredPolicy == null)
@@ -2863,24 +2908,27 @@ public abstract class AbstractQueue<X ex
     //=============
 
     @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, 
desiredState = State.ACTIVE)
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = State.UNINITIALIZED, desiredState = 
State.DELETED)
-    private void doDeleteBeforeInitialize()
+    private ListenableFuture<Void> doDeleteBeforeInitialize()
     {
         preSetAlternateExchange();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         _virtualHost.removeQueue(this);
         preSetAlternateExchange();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 Mon Mar  9 17:12:14 2015
@@ -191,7 +191,7 @@ class QueueConsumerImpl
 
         if(newState == ConsumerTarget.State.CLOSED && oldState != newState && 
!_closed.get())
         {
-            close();
+            closeAsync();
         }
         final StateChangeListener<? super QueueConsumerImpl, State> 
stateListener = getStateListener();
         if(stateListener != null)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
 Mon Mar  9 17:12:14 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.queue;
 
 import java.util.Map;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFactory;
 import org.apache.qpid.server.model.Port;
@@ -49,6 +51,14 @@ public class QueueFactory<X extends Queu
     }
 
     @Override
+    public ListenableFuture<X> createAsync(final ConfiguredObjectFactory 
factory,
+                                           final Map<String, Object> 
attributes,
+                                           final ConfiguredObject<?>... 
parents)
+    {
+        return getQueueFactory(factory, attributes).createAsync(factory, 
attributes, parents);
+    }
+
+    @Override
     public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory 
factory,
                                                  final ConfiguredObjectRecord 
record,
                                                  final ConfiguredObject<?>... 
parents)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
 Mon Mar  9 17:12:14 2015
@@ -38,6 +38,9 @@ import java.util.Set;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
@@ -98,7 +101,7 @@ public class FileKeyStoreImpl extends Ab
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, 
desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         // verify that it is not in use
         String storeName = getName();
@@ -113,12 +116,14 @@ public class FileKeyStoreImpl extends Ab
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, 
desiredState = State.ACTIVE)
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
 Mon Mar  9 17:12:14 2015
@@ -38,6 +38,9 @@ import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509TrustManager;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.AuthenticationProvider;
@@ -98,7 +101,7 @@ public class FileTrustStoreImpl extends
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, 
desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         // verify that it is not in use
         String storeName = getName();
@@ -139,12 +142,14 @@ public class FileTrustStoreImpl extends
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, 
desiredState = State.ACTIVE)
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
 Mon Mar  9 17:12:14 2015
@@ -57,6 +57,8 @@ import javax.net.ssl.KeyManager;
 import javax.net.ssl.KeyManagerFactory;
 import javax.xml.bind.DatatypeConverter;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -184,7 +186,7 @@ public class NonJavaKeyStoreImpl extends
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, 
desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         // verify that it is not in use
         String storeName = getName();
@@ -202,12 +204,14 @@ public class NonJavaKeyStoreImpl extends
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, 
desiredState = State.ACTIVE)
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
 Mon Mar  9 17:12:14 2015
@@ -45,6 +45,8 @@ import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 import javax.security.auth.x500.X500Principal;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -172,7 +174,7 @@ public class NonJavaTrustStoreImpl
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, 
desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         // verify that it is not in use
         String storeName = getName();
@@ -215,12 +217,14 @@ public class NonJavaTrustStoreImpl
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, 
desiredState = State.ACTIVE)
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
 Mon Mar  9 17:12:14 2015
@@ -28,6 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -175,13 +178,14 @@ public abstract class AbstractAuthentica
     }
 
     @StateTransition( currentState = State.UNINITIALIZED, desiredState = 
State.QUIESCED )
-    protected void startQuiesced()
+    protected ListenableFuture<Void> startQuiesced()
     {
         setState(State.QUIESCED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, 
State.QUIESCED }, desiredState = State.ACTIVE )
-    protected void activate()
+    protected ListenableFuture<Void> activate()
     {
         try
         {
@@ -199,11 +203,11 @@ public abstract class AbstractAuthentica
                 throw e;
             }
         }
-
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, 
State.ERRORED}, desiredState = State.DELETED)
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
 
         String providerName = getName();
@@ -219,15 +223,50 @@ public abstract class AbstractAuthentica
             }
         }
 
-        close();
-        if (_preferencesProvider != null)
-        {
-            _preferencesProvider.delete();
-        }
-        deleted();
+        final SettableFuture<Void> returnVal = SettableFuture.create();
 
-        setState(State.DELETED);
+        final ListenableFuture<Void> future = closeAsync();
+        future.addListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                if (_preferencesProvider != null)
+                {
+                    _preferencesProvider.deleteAsync().addListener(new 
Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                deleted();
+                                setState(State.DELETED);
+                            }
+                            finally
+                            {
+                                returnVal.set(null);
+                            }
+                        }
+                    }, getTaskExecutor().getExecutor());
+                }
+                else
+                {
+                    try
+                    {
+                        deleted();
+
+                        setState(State.DELETED);
+                    }
+                    finally
+                    {
+                        returnVal.set(null);
+                    }
+                }
+            }
+        }, getTaskExecutor().getExecutor());
 
+        return  returnVal;
     }
 
     @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
 Mon Mar  9 17:12:14 2015
@@ -98,22 +98,15 @@ public abstract class ConfigModelPasswor
     @Override
     public void deleteUser(final String user) throws AccountNotFoundException
     {
-        runTask(new VoidTaskWithException<AccountNotFoundException>()
+        final ManagedUser authUser = getUser(user);
+        if(authUser != null)
         {
-            @Override
-            public void execute() throws AccountNotFoundException
-            {
-                final ManagedUser authUser = getUser(user);
-                if(authUser != null)
-                {
-                    authUser.delete();
-                }
-                else
-                {
-                    throw new AccountNotFoundException("No such user: '" + 
user + "'");
-                }
-            }
-        });
+            authUser.delete();
+        }
+        else
+        {
+            throw new AccountNotFoundException("No such user: '" + user + "'");
+        }
     }
 
     @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
 Mon Mar  9 17:12:14 2015
@@ -27,6 +27,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.updater.VoidTask;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -96,10 +99,11 @@ class ManagedUser extends AbstractConfig
     }
 
     @StateTransition(currentState = {State.ACTIVE}, desiredState = 
State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         _authenticationManager.getUserMap().remove(getName());
         deleted();
+        return Futures.immediateFuture(null);
     }
 
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
 Mon Mar  9 17:12:14 2015
@@ -40,6 +40,9 @@ import javax.security.auth.login.Account
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.BrokerProperties;
@@ -119,16 +122,9 @@ public abstract class PrincipalDatabaseA
         super.onOpen();
         _principalDatabase = createDatabase();
         initialise();
-        List<Principal> users = _principalDatabase == null ? 
Collections.<Principal>emptyList() : _principalDatabase.getUsers();
-        for (Principal user : users)
-        {
-            PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
-            principalAdapter.registerWithParents();
-            principalAdapter.open();
-            _userMap.put(user, principalAdapter);
-        }
     }
 
+
     protected abstract PrincipalDatabase createDatabase();
 
 
@@ -217,9 +213,44 @@ public abstract class PrincipalDatabaseA
         return _principalDatabase;
     }
 
+    @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, 
desiredState = State.ACTIVE)
+    public ListenableFuture<Void> activate()
+    {
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        final List<Principal> users = _principalDatabase == null ? 
Collections.<Principal>emptyList() : _principalDatabase.getUsers();
+        _userMap.clear();
+        if(!users.isEmpty())
+        {
+            for (final Principal user : users)
+            {
+                final PrincipalAdapter principalAdapter = new 
PrincipalAdapter(user);
+                principalAdapter.registerWithParents();
+                principalAdapter.openAsync().addListener(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        _userMap.put(user, principalAdapter);
+                        if (_userMap.size() == users.size())
+                        {
+                            setState(State.ACTIVE);
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor());
+
+            }
+
+            return returnVal;
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
+        }
+    }
 
-    @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, 
State.ERRORED}, desiredState = State.DELETED)
-    public void doDelete()
+    @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, 
State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED)
+    public ListenableFuture<Void> doDelete()
     {
         File file = new File(_path);
         if (file.exists() && file.isFile())
@@ -228,6 +259,7 @@ public abstract class PrincipalDatabaseA
         }
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -465,13 +497,14 @@ public abstract class PrincipalDatabaseA
         }
 
         @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, 
desiredState = State.ACTIVE)
-        private void activate()
+        private ListenableFuture<Void> activate()
         {
             setState(State.ACTIVE);
+            return Futures.immediateFuture(null);
         }
 
         @StateTransition(currentState = State.ACTIVE, desiredState = 
State.DELETED)
-        private void doDelete()
+        private ListenableFuture<Void> doDelete()
         {
             try
             {
@@ -489,7 +522,7 @@ public abstract class PrincipalDatabaseA
             {
                 LOGGER.warn("Failed to delete user " + _user, e);
             }
-
+            return Futures.immediateFuture(null);
         }
 
         @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
 Mon Mar  9 17:12:14 2015
@@ -22,6 +22,9 @@ package org.apache.qpid.server.security.
 
 import java.util.Map;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Group;
@@ -77,16 +80,18 @@ public class GroupImpl extends AbstractC
 
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, 
State.ERRORED }, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
 
     @StateTransition(currentState = {State.ACTIVE}, desiredState = 
State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         deleted();
+        return Futures.immediateFuture(null);
     }
 
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
 Mon Mar  9 17:12:14 2015
@@ -23,6 +23,9 @@ package org.apache.qpid.server.security.
 import java.security.Principal;
 import java.util.Map;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Group;
 import org.apache.qpid.server.model.GroupMember;
@@ -61,15 +64,17 @@ public class GroupMemberImpl extends Abs
 
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, 
State.ERRORED }, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
 
     @StateTransition(currentState = {State.ACTIVE}, desiredState = 
State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         deleted();
+        return Futures.immediateFuture(null);
     }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
 Mon Mar  9 17:12:14 2015
@@ -26,6 +26,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -89,16 +92,18 @@ public class GroupProviderImpl extends A
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, 
State.ERRORED }, desiredState = State.ACTIVE )
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
 
     @StateTransition(currentState = {State.ACTIVE}, desiredState = 
State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         deleted();
+        return Futures.immediateFuture(null);
     }
 
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Mon Mar  9 17:12:14 2015
@@ -38,11 +38,15 @@ import java.util.concurrent.ScheduledFut
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -388,27 +392,65 @@ public abstract class AbstractVirtualHos
         return isStoreEmptyHandler.isEmpty();
     }
 
-    protected void createDefaultExchanges()
+    protected ListenableFuture<Void> createDefaultExchanges()
     {
-        Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), 
new PrivilegedAction<Void>()
+        return 
Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new 
PrivilegedAction<ListenableFuture<Void>>()
         {
+            private static final int TOTAL_STANDARD_EXCHANGES = 4;
+            private final AtomicInteger _createdExchangeCount = new 
AtomicInteger();
+            private SettableFuture<Void> _future = SettableFuture.create();
+
             @Override
-            public Void run()
+            public ListenableFuture<Void> run()
             {
                 addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
                 addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, 
ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
                 addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, 
ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
                 addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, 
ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
-                return null;
+                return _future;
+            }
+
+            private void standardExchangeCreated()
+            {
+                if(_createdExchangeCount.incrementAndGet() == 
TOTAL_STANDARD_EXCHANGES)
+                {
+                    _future.set(null);
+                }
             }
 
-            void addStandardExchange(String name, String type)
+            ListenableFuture<Void> addStandardExchange(String name, String 
type)
             {
+
                 Map<String, Object> attributes = new HashMap<String, Object>();
                 attributes.put(Exchange.NAME, name);
                 attributes.put(Exchange.TYPE, type);
                 attributes.put(Exchange.ID, 
UUIDGenerator.generateExchangeUUID(name, getName()));
-                childAdded(addExchange(attributes));
+                final ListenableFuture<ExchangeImpl> future = 
addExchangeAsync(attributes);
+                final SettableFuture<Void> returnVal = SettableFuture.create();
+                Futures.addCallback(future, new FutureCallback<ExchangeImpl>()
+                {
+                    @Override
+                    public void onSuccess(final ExchangeImpl result)
+                    {
+                        try
+                        {
+                            childAdded(result);
+                        }
+                        finally
+                        {
+                            standardExchangeCreated();
+                        }
+
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable t)
+                    {
+                        standardExchangeCreated();
+                    }
+                }, getTaskExecutor().getExecutor());
+
+                return returnVal;
             }
         });
     }
@@ -777,6 +819,23 @@ public abstract class AbstractVirtualHos
 
     }
 
+    private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> 
attributes)
+            throws ExchangeExistsException, ReservedExchangeNameException,
+                   NoFactoryForTypeException
+    {
+        try
+        {
+            ListenableFuture result = 
getObjectFactory().createAsync(Exchange.class, attributes, this);
+            return result;
+        }
+        catch (DuplicateNameException e)
+        {
+            throw new ExchangeExistsException(getExchange(e.getName()));
+        }
+
+    }
+
+
     @Override
     public void removeExchange(ExchangeImpl exchange, boolean force)
             throws ExchangeIsAlternateException, RequiredExchangeException
@@ -809,7 +868,6 @@ public abstract class AbstractVirtualHos
     @Override
     protected ListenableFuture<Void> beforeClose()
     {
-        _logger.debug("KWDEBUG setting state to UNAVAILABLE");
         setState(State.UNAVAILABLE);
 
         return super.beforeClose();
@@ -818,7 +876,6 @@ public abstract class AbstractVirtualHos
     @Override
     protected void onClose()
     {
-        _logger.debug("KWDEBUG onClose");
         //Stop Connections
         _connectionRegistry.close();
         _dtxRegistry.close();
@@ -830,7 +887,6 @@ public abstract class AbstractVirtualHos
 
     private void closeMessageStore()
     {
-        _logger.debug("KWDEBUG closeMessageStore");
         if (getMessageStore() != null)
         {
             try
@@ -1312,38 +1368,76 @@ public abstract class AbstractVirtualHos
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, 
State.ERRORED }, desiredState = State.STOPPED )
-    protected void doStop()
+    protected ListenableFuture<Void> doStop()
     {
-        // TODO - need to deal with async close children
-        closeChildren();
-        shutdownHouseKeeping();
-        closeMessageStore();
-        setState(State.STOPPED);
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        closeChildren().addListener(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        try
+                        {
+                            shutdownHouseKeeping();
+                            closeMessageStore();
+                            setState(State.STOPPED);
+
+                        }
+                        finally
+                        {
+                            returnVal.set(null);
+                        }
+                    }
+                }, getTaskExecutor().getExecutor()
+                                   );
+        return returnVal;
     }
 
     @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, 
desiredState = State.DELETED )
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         if(_deleted.compareAndSet(false,true))
         {
+            final SettableFuture<Void> returnVal = SettableFuture.create();
             String hostName = getName();
 
-            close();
+            closeAsync().addListener(
+                    new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            try
+                            {
+                                MessageStore ms = getMessageStore();
+                                if (ms != null)
+                                {
+                                    try
+                                    {
+                                        ms.onDelete(AbstractVirtualHost.this);
+                                    }
+                                    catch (Exception e)
+                                    {
+                                        _logger.warn("Exception occurred on 
message store deletion", e);
+                                    }
+                                }
+                                deleted();
+                                setState(State.DELETED);
+                            }
+                            finally
+                            {
+                                returnVal.set(null);
+                            }
+                        }
+                    }, getTaskExecutor().getExecutor()
+                               );
 
-            MessageStore ms = getMessageStore();
-            if (ms != null)
-            {
-                try
-                {
-                    ms.onDelete(this);
-                }
-                catch (Exception e)
-                {
-                    _logger.warn("Exception occurred on message store 
deletion", e);
-                }
-            }
-            deleted();
-            setState(State.DELETED);
+            return returnVal;
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
         }
     }
 
@@ -1532,7 +1626,7 @@ public abstract class AbstractVirtualHos
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, 
desiredState = State.ACTIVE )
-    private void onActivate()
+    private ListenableFuture<Void> onActivate()
     {
         _houseKeepingTasks = new 
ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new 
SuppressingInheritedAccessControlContextThreadFactory());
 
@@ -1552,9 +1646,28 @@ public abstract class AbstractVirtualHos
 
         if (isStoreEmpty())
         {
-            createDefaultExchanges();
+            final SettableFuture<Void> returnVal = SettableFuture.create();
+            createDefaultExchanges().addListener(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    postCreateDefaultExchangeTasks();
+                    returnVal.set(null);
+                }
+            }, getTaskExecutor().getExecutor());
+            return returnVal;
         }
+        else
+        {
+            postCreateDefaultExchangeTasks();
 
+            return Futures.immediateFuture(null);
+        }
+    }
+
+    private void postCreateDefaultExchangeTasks()
+    {
         if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
         {
             _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
@@ -1589,9 +1702,32 @@ public abstract class AbstractVirtualHos
             scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), 
_fileSystemSpaceChecker);
         }
     }
+    private static class ChildCounter
+    {
+        private final AtomicInteger _count = new AtomicInteger();
+        private final Runnable _task;
+
+        private ChildCounter(final Runnable task)
+        {
+            _task = task;
+        }
+
+        public void incrementCount()
+        {
+            _count.incrementAndGet();
+        }
+
+        public void decrementCount()
+        {
+            if(_count.decrementAndGet() == 0)
+            {
+                _task.run();
+            }
+        }
+    }
 
     @StateTransition( currentState = { State.STOPPED, State.ERRORED }, 
desiredState = State.ACTIVE )
-    private void onRestart()
+    private ListenableFuture<Void> onRestart()
     {
         resetStatistics();
 
@@ -1622,6 +1758,25 @@ public abstract class AbstractVirtualHos
 
         new GenericRecoverer(this).recover(records);
 
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+        final ChildCounter counter = new ChildCounter(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                onActivate().addListener(
+                        new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                returnVal.set(null);
+                            }
+                        }, getTaskExecutor().getExecutor()
+                                        );
+            }
+        });
+        counter.incrementCount();
         Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new 
PrivilegedAction<Object>()
         {
             @Override
@@ -1632,14 +1787,22 @@ public abstract class AbstractVirtualHos
                     @Override
                     public void performAction(final ConfiguredObject<?> object)
                     {
-                        object.open();
+                        counter.incrementCount();
+                        object.openAsync().addListener(new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+                                counter.decrementCount();
+                            }
+                        }, getTaskExecutor().getExecutor());
                     }
                 });
                 return null;
             }
         });
-
-        onActivate();
+        counter.decrementCount();
+        return returnVal;
     }
 
     private class FileSystemSpaceChecker extends HouseKeepingTask

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
 Mon Mar  9 17:12:14 2015
@@ -29,6 +29,8 @@ import java.util.Map;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -68,7 +70,7 @@ public abstract class AbstractStandardVi
     }
 
     @Override
-    protected void activate()
+    protected ListenableFuture<Void> activate()
     {
         if (LOGGER.isDebugEnabled())
         {
@@ -107,15 +109,21 @@ public abstract class AbstractStandardVi
         if (host != null)
         {
             final VirtualHost<?,?,?> recoveredHost = host;
-            Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), 
new PrivilegedAction<Object>()
-            {
-                @Override
-                public Object run()
-                {
-                    recoveredHost.open();
-                    return null;
-                }
-            });
+            final ListenableFuture<Void> openFuture = 
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(),
+                                                                   new 
PrivilegedAction<ListenableFuture<Void>>()
+                                                                   {
+                                                                       
@Override
+                                                                       public 
ListenableFuture<Void> run()
+                                                                       {
+                                                                           
return recoveredHost.openAsync();
+
+                                                                       }
+                                                                   });
+            return openFuture;
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
         }
     }
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
 Mon Mar  9 17:12:14 2015
@@ -38,7 +38,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -122,16 +125,47 @@ public abstract class AbstractVirtualHos
     }
 
     @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, 
State.ERRORED }, desiredState = State.ACTIVE )
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+
         try
         {
-            activate();
-            setState(State.ACTIVE);
+            Futures.addCallback(activate(),
+                                new FutureCallback<Void>()
+                                {
+                                    @Override
+                                    public void onSuccess(final Void result)
+                                    {
+                                        try
+                                        {
+                                            setState(State.ACTIVE);
+                                        }
+                                        finally
+                                        {
+                                            returnVal.set(null);
+                                        }
+
+                                    }
+
+                                    @Override
+                                    public void onFailure(final Throwable t)
+                                    {
+
+                                        setState(State.ERRORED);
+                                        returnVal.set(null);
+                                        if (_broker.isManagementMode())
+                                        {
+                                            LOGGER.warn("Failed to make " + 
this + " active.", t);
+                                        }
+                                    }
+                                }, getTaskExecutor().getExecutor()
+                               );
         }
         catch(RuntimeException e)
         {
             setState(State.ERRORED);
+            returnVal.set(null);
             if (_broker.isManagementMode())
             {
                 LOGGER.warn("Failed to make " + this + " active.", e);
@@ -141,6 +175,7 @@ public abstract class AbstractVirtualHos
                 throw e;
             }
         }
+        return returnVal;
     }
 
     @Override
@@ -183,40 +218,73 @@ public abstract class AbstractVirtualHos
     }
 
     @StateTransition( currentState = { State.ACTIVE, State.STOPPED, 
State.ERRORED}, desiredState = State.DELETED )
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
+        final SettableFuture<Void> returnVal = SettableFuture.create();
         setState(State.DELETED);
         deleteVirtualHostIfExists();
-        final ListenableFuture<Void> closeFuture = close();
-        deleted();
-        DurableConfigurationStore configurationStore = getConfigurationStore();
-        if (configurationStore != null)
+        final ListenableFuture<Void> closeFuture = closeAsync();
+        closeFuture.addListener(new Runnable()
         {
-            configurationStore.onDelete(this);
-        }
+            @Override
+            public void run()
+            {
+                try
+                {
+                    deleted();
+                    DurableConfigurationStore configurationStore = 
getConfigurationStore();
+                    if (configurationStore != null)
+                    {
+                        
configurationStore.onDelete(AbstractVirtualHostNode.this);
+                    }
+                }
+                finally
+                {
+                    returnVal.set(null);
+                }
+            }
+        }, getTaskExecutor().getExecutor());
+
+        return returnVal;
+
     }
 
-    protected void deleteVirtualHostIfExists()
+    protected ListenableFuture<Void> deleteVirtualHostIfExists()
     {
         VirtualHost<?, ?, ?> virtualHost = getVirtualHost();
         if (virtualHost != null)
         {
-            virtualHost.delete();
+            return virtualHost.deleteAsync();
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
         }
     }
 
     @StateTransition( currentState = { State.ACTIVE, State.ERRORED, 
State.UNINITIALIZED }, desiredState = State.STOPPED )
-    protected void doStop()
+    protected ListenableFuture<Void> doStop()
     {
-        stopAndSetStateTo(State.STOPPED);
+        return stopAndSetStateTo(State.STOPPED);
     }
 
-    protected void stopAndSetStateTo(State stoppedState)
+    protected ListenableFuture<Void> stopAndSetStateTo(final State 
stoppedState)
     {
-        // TODO - deal with async close children
-        closeChildren();
-        closeConfigurationStoreSafely();
-        setState(stoppedState);
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+
+        ListenableFuture<Void> childCloseFuture = closeChildren();
+        childCloseFuture.addListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                closeConfigurationStoreSafely();
+                setState(stoppedState);
+                returnVal.set(null);
+            }
+        }, getTaskExecutor().getExecutor());
+
+        return returnVal;
     }
 
     @Override
@@ -311,7 +379,7 @@ public abstract class AbstractVirtualHos
 
     protected abstract DurableConfigurationStore createConfigurationStore();
 
-    protected abstract void activate();
+    protected abstract ListenableFuture<Void> activate();
 
 
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
 Mon Mar  9 17:12:14 2015
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +65,7 @@ public class RedirectingVirtualHostNodeI
     }
 
     @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, 
State.ERRORED }, desiredState = State.ACTIVE )
-    protected void doActivate()
+    protected ListenableFuture<Void> doActivate()
     {
         try
         {
@@ -83,6 +85,7 @@ public class RedirectingVirtualHostNodeI
                 throw e;
             }
         }
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
 Mon Mar  9 17:12:14 2015
@@ -151,6 +151,7 @@ public class HeadersBindingTest extends
         _count++;
         _queue = mock(AMQQueue.class);
         TaskExecutor executor = new CurrentThreadTaskExecutor();
+        executor.start();
         VirtualHostImpl vhost = mock(VirtualHostImpl.class);
         when(_queue.getVirtualHost()).thenReturn(vhost);
         when(_queue.getModel()).thenReturn(BrokerModel.getInstance());
@@ -158,6 +159,7 @@ public class HeadersBindingTest extends
         
when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
         final EventLogger eventLogger = new EventLogger();
         when(vhost.getEventLogger()).thenReturn(eventLogger);
+        when(vhost.getTaskExecutor()).thenReturn(executor);
         _exchange = mock(ExchangeImpl.class);
         
when(_exchange.getType()).thenReturn(ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
         when(_exchange.getEventLogger()).thenReturn(eventLogger);

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
 Mon Mar  9 17:12:14 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.model;
 
 import static java.util.Arrays.asList;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -33,12 +34,15 @@ import static org.mockito.Mockito.verify
 import static org.mockito.Mockito.when;
 
 import java.security.AccessControlException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -55,6 +59,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
 import org.apache.qpid.test.utils.QpidTestCase;
@@ -413,7 +418,30 @@ public class VirtualHostTest extends Qpi
     private AMQConnectionModel createMockProtocolConnection(final 
VirtualHost<?, ?, ?> virtualHost)
     {
         final AMQConnectionModel connection = mock(AMQConnectionModel.class);
+        final List<Action<?>> tasks = new ArrayList<>();
+        final ArgumentCaptor<Action> deleteTaskCaptor = 
ArgumentCaptor.forClass(Action.class);
+        Answer answer = new Answer()
+        {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable
+            {
+                return tasks.add(deleteTaskCaptor.getValue());
+            }
+        };
+        
doAnswer(answer).when(connection).addDeleteTask(deleteTaskCaptor.capture());
         when(connection.getVirtualHost()).thenReturn(virtualHost);
+        doAnswer(new Answer()
+        {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable
+            {
+                for(Action action : tasks)
+                {
+                    action.performAction(connection);
+                }
+                return null;
+            }
+        }).when(connection).closeAsync(any(AMQConstant.class),anyString());
         when(connection.getRemoteAddressString()).thenReturn("peer:1234");
         return connection;
     }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java
 Mon Mar  9 17:12:14 2015
@@ -26,6 +26,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -127,15 +130,17 @@ public class TestConfiguredObject extend
     }
 
     @StateTransition( currentState = {State.ERRORED, State.UNINITIALIZED}, 
desiredState = State.ACTIVE )
-    protected void activate()
+    protected ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = {State.ERRORED, State.UNINITIALIZED}, 
desiredState = State.DELETED )
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     public boolean isOpened()

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1665306&r1=1665305&r2=1665306&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
 Mon Mar  9 17:12:14 2015
@@ -31,14 +31,12 @@ import static org.mockito.Mockito.verify
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 import org.apache.log4j.Logger;
 import org.mockito.ArgumentCaptor;
@@ -56,7 +54,6 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.QueueNotificationListener;
 import org.apache.qpid.server.model.UUIDGenerator;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to