Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java Mon Mar 9 17:12:14 2015 @@ -32,6 +32,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; @@ -147,7 +150,8 @@ public class FileBasedGroupProviderImpl GroupAdapter groupAdapter = new GroupAdapter(attrMap); principals.add(groupAdapter); groupAdapter.registerWithParents(); - groupAdapter.open(); + // TODO - we know this is safe, but the sync method shouldn't really be called from the management thread + groupAdapter.openAsync(); } } @@ -265,7 +269,7 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if (_groupDatabase != null) { @@ -282,29 +286,48 @@ public class FileBasedGroupProviderImpl throw new IllegalConfigurationException(String.format("Cannot load groups from '%s'", getPath())); } } + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.ERRORED}, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - File file = new File(getPath()); - if (file.exists()) - { - if (!file.delete()) - { - throw new IllegalConfigurationException("Cannot delete group file"); - } - } - - deleted(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + File file = new File(getPath()); + if (file.exists()) + { + if (!file.delete()) + { + throw new IllegalConfigurationException("Cannot delete group file"); + } + } + + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } public Set<Principal> getGroupPrincipalsForUser(String username) @@ -377,9 +400,10 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override @@ -396,7 +420,8 @@ public class FileBasedGroupProviderImpl attrMap.put(GroupMember.NAME, principal.getName()); GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap); groupMemberAdapter.registerWithParents(); - groupMemberAdapter.open(); + // todo - this will be safe, but the synchronous open should not be called from the management thread + groupMemberAdapter.openAsync(); members.add(groupMemberAdapter); } _groupPrincipal = new GroupPrincipal(getName()); @@ -459,12 +484,13 @@ public class FileBasedGroupProviderImpl } @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName()); _groupDatabase.removeGroup(getName()); deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override @@ -522,19 +548,21 @@ public class FileBasedGroupProviderImpl } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName()); _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName()); deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java Mon Mar 9 17:12:14 2015 @@ -37,16 +37,17 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.util.BaseAction; -import org.apache.qpid.server.util.FileHelper; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonProcessingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; import org.codehaus.jackson.type.TypeReference; +import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; @@ -55,6 +56,8 @@ import org.apache.qpid.server.model.Mana import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.util.BaseAction; +import org.apache.qpid.server.util.FileHelper; public class FileSystemPreferencesProviderImpl @@ -128,7 +131,7 @@ public class FileSystemPreferencesProvid } @StateTransition( currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { if (_store != null) { @@ -138,6 +141,7 @@ public class FileSystemPreferencesProvid { throw new IllegalStateException("Cannot open preferences provider " + getName() + " in state " + getState() ); } + return Futures.immediateFuture(null); } @Override @@ -171,33 +175,52 @@ public class FileSystemPreferencesProvid } @StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED) - private void doQuiesce() + private ListenableFuture<Void> doQuiesce() { if(_store != null) { _store.close(); } setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED }, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + if(_store != null) + { + _store.close(); + _store.delete(); + deleted(); + _authenticationProvider.setPreferencesProvider(null); + + } + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); - if(_store != null) - { - _store.close(); - _store.delete(); - deleted(); - _authenticationProvider.setPreferencesProvider(null); + return returnVal; - } - setState(State.DELETED); } @StateTransition(currentState = State.QUIESCED, desiredState = State.ACTIVE ) - private void restart() + private ListenableFuture<Void> restart() { if (_store == null) { @@ -206,6 +229,7 @@ public class FileSystemPreferencesProvid _store.open(); setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Mon Mar 9 17:12:14 2015 @@ -26,6 +26,9 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; @@ -169,10 +172,11 @@ final class SessionAdapter extends Abstr } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java Mon Mar 9 17:12:14 2015 @@ -28,6 +28,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -228,14 +231,24 @@ abstract public class AbstractPort<X ext } @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { - close(); - setState(State.DELETED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeAsync().addListener(new Runnable() + { + @Override + public void run() + { + setState(State.DELETED); + returnVal.set(null); + + } + }, getTaskExecutor().getExecutor()); + return returnVal; } @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { try { @@ -246,12 +259,14 @@ abstract public class AbstractPort<X ext setState(State.ERRORED); throw new IllegalConfigurationException("Unable to active port '" + getName() + "'of type " + getType() + " on " + getPort(), e); } + return Futures.immediateFuture(null); } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) - private void startQuiesced() + private ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java Mon Mar 9 17:12:14 2015 @@ -23,6 +23,8 @@ package org.apache.qpid.server.model.por import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -108,6 +110,14 @@ public class PortFactory<X extends Port< } @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + return getPortFactory(factory, attributes, (Broker<?>)parents[0]).createAsync(factory, attributes,parents); + } + + @Override public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, final ConfiguredObjectRecord record, final ConfiguredObject<?>... parents) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java Mon Mar 9 17:12:14 2015 @@ -20,19 +20,23 @@ */ package org.apache.qpid.server.plugin; +import java.util.Map; + +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.UnresolvedConfiguredObject; -import java.util.Map; - public interface ConfiguredObjectTypeFactory<X extends ConfiguredObject<X>> extends Pluggable { Class<? super X> getCategoryClass(); X create(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents); + ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, Map<String, Object> attributes, ConfiguredObject<?>... parents); + UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, ConfiguredObjectRecord record, ConfiguredObject<?>... parents); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Mar 9 17:12:14 2015 @@ -43,12 +43,15 @@ import java.util.concurrent.atomic.Atomi import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.Task; +import org.apache.qpid.server.configuration.updater.TaskWithException; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; @@ -650,16 +653,51 @@ public abstract class AbstractQueue<X ex @Override - public synchronized QueueConsumerImpl addConsumer(final ConsumerTarget target, - FilterManager filters, + public QueueConsumerImpl addConsumer(final ConsumerTarget target, + final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<ConsumerImpl.Option> optionSet) + final EnumSet<ConsumerImpl.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused { + try + { + return getTaskExecutor().run(new TaskWithException<QueueConsumerImpl, Exception>() + { + @Override + public QueueConsumerImpl execute() + throws Exception + { + + return addConsumerInternal(target, filters, messageClass, consumerName, optionSet); + } + }); + } + catch (ExistingExclusiveConsumer | ConsumerAccessRefused | + ExistingConsumerPreventsExclusive | RuntimeException e) + { + throw e; + } + catch (Exception e) + { + // Should never happen + throw new ServerScopedRuntimeException(e); + } + + + } + + private QueueConsumerImpl addConsumerInternal(final ConsumerTarget target, + FilterManager filters, + final Class<? extends ServerMessage> messageClass, + final String consumerName, + EnumSet<ConsumerImpl.Option> optionSet) + throws ExistingExclusiveConsumer, ConsumerAccessRefused, + ExistingConsumerPreventsExclusive + { if (hasExclusiveConsumer()) { throw new ExistingExclusiveConsumer(); @@ -771,7 +809,7 @@ public abstract class AbstractQueue<X ex QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, - filters, + filters, messageClass, optionSet); @@ -820,7 +858,6 @@ public abstract class AbstractQueue<X ex deliverAsync(); return consumer; - } @Override @@ -832,7 +869,7 @@ public abstract class AbstractQueue<X ex - synchronized void unregisterConsumer(final QueueConsumerImpl consumer) + void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) { @@ -843,7 +880,7 @@ public abstract class AbstractQueue<X ex if (removed) { - consumer.close(); + consumer.closeAsync(); // No longer can the queue have an exclusive consumer setExclusiveSubscriber(null); @@ -1802,7 +1839,15 @@ public abstract class AbstractQueue<X ex for (BindingImpl b : bindingCopy) { - b.delete(); + // TODO - RG - Need to sort out bindings! + if(getTaskExecutor().isTaskExecutorThread()) + { + b.deleteAsync(); + } + else + { + b.delete(); + } } QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); @@ -1855,7 +1900,7 @@ public abstract class AbstractQueue<X ex } _deleteTaskList.clear(); - close(); + closeAsync(); deleted(); //Log Queue Deletion getEventLogger().message(_logSubject, QueueMessages.DELETED()); @@ -2661,7 +2706,7 @@ public abstract class AbstractQueue<X ex return allowed; } - private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) + private void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive { if(desiredPolicy == null) @@ -2863,24 +2908,27 @@ public abstract class AbstractQueue<X ex //============= @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED) - private void doDeleteBeforeInitialize() + private ListenableFuture<Void> doDeleteBeforeInitialize() { preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _virtualHost.removeQueue(this); preSetAlternateExchange(); setState(State.DELETED); + return Futures.immediateFuture(null); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Mon Mar 9 17:12:14 2015 @@ -191,7 +191,7 @@ class QueueConsumerImpl if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get()) { - close(); + closeAsync(); } final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener(); if(stateListener != null) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java Mon Mar 9 17:12:14 2015 @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.util.Map; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Port; @@ -49,6 +51,14 @@ public class QueueFactory<X extends Queu } @Override + public ListenableFuture<X> createAsync(final ConfiguredObjectFactory factory, + final Map<String, Object> attributes, + final ConfiguredObject<?>... parents) + { + return getQueueFactory(factory, attributes).createAsync(factory, attributes, parents); + } + + @Override public UnresolvedConfiguredObject<X> recover(final ConfiguredObjectFactory factory, final ConfiguredObjectRecord record, final ConfiguredObject<?>... parents) Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java Mon Mar 9 17:12:14 2015 @@ -38,6 +38,9 @@ import java.util.Set; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; @@ -98,7 +101,7 @@ public class FileKeyStoreImpl extends Ab } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -113,12 +116,14 @@ public class FileKeyStoreImpl extends Ab } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java Mon Mar 9 17:12:14 2015 @@ -38,6 +38,9 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; @@ -98,7 +101,7 @@ public class FileTrustStoreImpl extends } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -139,12 +142,14 @@ public class FileTrustStoreImpl extends } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java Mon Mar 9 17:12:14 2015 @@ -57,6 +57,8 @@ import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.xml.bind.DatatypeConverter; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -184,7 +186,7 @@ public class NonJavaKeyStoreImpl extends } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -202,12 +204,14 @@ public class NonJavaKeyStoreImpl extends } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java Mon Mar 9 17:12:14 2015 @@ -45,6 +45,8 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.security.auth.x500.X500Principal; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -172,7 +174,7 @@ public class NonJavaTrustStoreImpl } @StateTransition(currentState = {State.ACTIVE, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { // verify that it is not in use String storeName = getName(); @@ -215,12 +217,14 @@ public class NonJavaTrustStoreImpl } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java Mon Mar 9 17:12:14 2015 @@ -28,6 +28,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -175,13 +178,14 @@ public abstract class AbstractAuthentica } @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED ) - protected void startQuiesced() + protected ListenableFuture<Void> startQuiesced() { setState(State.QUIESCED); + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { try { @@ -199,11 +203,11 @@ public abstract class AbstractAuthentica throw e; } } - + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { String providerName = getName(); @@ -219,15 +223,50 @@ public abstract class AbstractAuthentica } } - close(); - if (_preferencesProvider != null) - { - _preferencesProvider.delete(); - } - deleted(); + final SettableFuture<Void> returnVal = SettableFuture.create(); - setState(State.DELETED); + final ListenableFuture<Void> future = closeAsync(); + future.addListener(new Runnable() + { + @Override + public void run() + { + if (_preferencesProvider != null) + { + _preferencesProvider.deleteAsync().addListener(new Runnable() + { + @Override + public void run() + { + try + { + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + } + else + { + try + { + deleted(); + + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + } + }, getTaskExecutor().getExecutor()); + return returnVal; } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java Mon Mar 9 17:12:14 2015 @@ -98,22 +98,15 @@ public abstract class ConfigModelPasswor @Override public void deleteUser(final String user) throws AccountNotFoundException { - runTask(new VoidTaskWithException<AccountNotFoundException>() + final ManagedUser authUser = getUser(user); + if(authUser != null) { - @Override - public void execute() throws AccountNotFoundException - { - final ManagedUser authUser = getUser(user); - if(authUser != null) - { - authUser.delete(); - } - else - { - throw new AccountNotFoundException("No such user: '" + user + "'"); - } - } - }); + authUser.delete(); + } + else + { + throw new AccountNotFoundException("No such user: '" + user + "'"); + } } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java Mon Mar 9 17:12:14 2015 @@ -27,6 +27,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.updater.VoidTask; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; @@ -96,10 +99,11 @@ class ManagedUser extends AbstractConfig } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { _authenticationManager.getUserMap().remove(getName()); deleted(); + return Futures.immediateFuture(null); } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Mon Mar 9 17:12:14 2015 @@ -40,6 +40,9 @@ import javax.security.auth.login.Account import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.BrokerProperties; @@ -119,16 +122,9 @@ public abstract class PrincipalDatabaseA super.onOpen(); _principalDatabase = createDatabase(); initialise(); - List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers(); - for (Principal user : users) - { - PrincipalAdapter principalAdapter = new PrincipalAdapter(user); - principalAdapter.registerWithParents(); - principalAdapter.open(); - _userMap.put(user, principalAdapter); - } } + protected abstract PrincipalDatabase createDatabase(); @@ -217,9 +213,44 @@ public abstract class PrincipalDatabaseA return _principalDatabase; } + @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) + public ListenableFuture<Void> activate() + { + final SettableFuture<Void> returnVal = SettableFuture.create(); + final List<Principal> users = _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers(); + _userMap.clear(); + if(!users.isEmpty()) + { + for (final Principal user : users) + { + final PrincipalAdapter principalAdapter = new PrincipalAdapter(user); + principalAdapter.registerWithParents(); + principalAdapter.openAsync().addListener(new Runnable() + { + @Override + public void run() + { + _userMap.put(user, principalAdapter); + if (_userMap.size() == users.size()) + { + setState(State.ACTIVE); + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + + } + + return returnVal; + } + else + { + return Futures.immediateFuture(null); + } + } - @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) - public void doDelete() + @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED) + public ListenableFuture<Void> doDelete() { File file = new File(_path); if (file.exists() && file.isFile()) @@ -228,6 +259,7 @@ public abstract class PrincipalDatabaseA } deleted(); setState(State.DELETED); + return Futures.immediateFuture(null); } @Override @@ -465,13 +497,14 @@ public abstract class PrincipalDatabaseA } @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { try { @@ -489,7 +522,7 @@ public abstract class PrincipalDatabaseA { LOGGER.warn("Failed to delete user " + _user, e); } - + return Futures.immediateFuture(null); } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java Mon Mar 9 17:12:14 2015 @@ -22,6 +22,9 @@ package org.apache.qpid.server.security. import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Group; @@ -77,16 +80,18 @@ public class GroupImpl extends AbstractC @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java Mon Mar 9 17:12:14 2015 @@ -23,6 +23,9 @@ package org.apache.qpid.server.security. import java.security.Principal; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Group; import org.apache.qpid.server.model.GroupMember; @@ -61,15 +64,17 @@ public class GroupMemberImpl extends Abs @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java Mon Mar 9 17:12:14 2015 @@ -26,6 +26,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -89,16 +92,18 @@ public class GroupProviderImpl extends A } @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.ERRORED }, desiredState = State.ACTIVE ) - private void activate() + private ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition(currentState = {State.ACTIVE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { deleted(); + return Futures.immediateFuture(null); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Mar 9 17:12:14 2015 @@ -38,11 +38,15 @@ import java.util.concurrent.ScheduledFut import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -388,27 +392,65 @@ public abstract class AbstractVirtualHos return isStoreEmptyHandler.isEmpty(); } - protected void createDefaultExchanges() + protected ListenableFuture<Void> createDefaultExchanges() { - Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>() + return Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<ListenableFuture<Void>>() { + private static final int TOTAL_STANDARD_EXCHANGES = 4; + private final AtomicInteger _createdExchangeCount = new AtomicInteger(); + private SettableFuture<Void> _future = SettableFuture.create(); + @Override - public Void run() + public ListenableFuture<Void> run() { addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); - return null; + return _future; + } + + private void standardExchangeCreated() + { + if(_createdExchangeCount.incrementAndGet() == TOTAL_STANDARD_EXCHANGES) + { + _future.set(null); + } } - void addStandardExchange(String name, String type) + ListenableFuture<Void> addStandardExchange(String name, String type) { + Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Exchange.NAME, name); attributes.put(Exchange.TYPE, type); attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName())); - childAdded(addExchange(attributes)); + final ListenableFuture<ExchangeImpl> future = addExchangeAsync(attributes); + final SettableFuture<Void> returnVal = SettableFuture.create(); + Futures.addCallback(future, new FutureCallback<ExchangeImpl>() + { + @Override + public void onSuccess(final ExchangeImpl result) + { + try + { + childAdded(result); + } + finally + { + standardExchangeCreated(); + } + + } + + @Override + public void onFailure(final Throwable t) + { + standardExchangeCreated(); + } + }, getTaskExecutor().getExecutor()); + + return returnVal; } }); } @@ -777,6 +819,23 @@ public abstract class AbstractVirtualHos } + private ListenableFuture<ExchangeImpl> addExchangeAsync(Map<String,Object> attributes) + throws ExchangeExistsException, ReservedExchangeNameException, + NoFactoryForTypeException + { + try + { + ListenableFuture result = getObjectFactory().createAsync(Exchange.class, attributes, this); + return result; + } + catch (DuplicateNameException e) + { + throw new ExchangeExistsException(getExchange(e.getName())); + } + + } + + @Override public void removeExchange(ExchangeImpl exchange, boolean force) throws ExchangeIsAlternateException, RequiredExchangeException @@ -809,7 +868,6 @@ public abstract class AbstractVirtualHos @Override protected ListenableFuture<Void> beforeClose() { - _logger.debug("KWDEBUG setting state to UNAVAILABLE"); setState(State.UNAVAILABLE); return super.beforeClose(); @@ -818,7 +876,6 @@ public abstract class AbstractVirtualHos @Override protected void onClose() { - _logger.debug("KWDEBUG onClose"); //Stop Connections _connectionRegistry.close(); _dtxRegistry.close(); @@ -830,7 +887,6 @@ public abstract class AbstractVirtualHos private void closeMessageStore() { - _logger.debug("KWDEBUG closeMessageStore"); if (getMessageStore() != null) { try @@ -1312,38 +1368,76 @@ public abstract class AbstractVirtualHos } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - // TODO - need to deal with async close children - closeChildren(); - shutdownHouseKeeping(); - closeMessageStore(); - setState(State.STOPPED); + final SettableFuture<Void> returnVal = SettableFuture.create(); + closeChildren().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + shutdownHouseKeeping(); + closeMessageStore(); + setState(State.STOPPED); + + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); + return returnVal; } @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED ) - private void doDelete() + private ListenableFuture<Void> doDelete() { if(_deleted.compareAndSet(false,true)) { + final SettableFuture<Void> returnVal = SettableFuture.create(); String hostName = getName(); - close(); + closeAsync().addListener( + new Runnable() + { + @Override + public void run() + { + try + { + MessageStore ms = getMessageStore(); + if (ms != null) + { + try + { + ms.onDelete(AbstractVirtualHost.this); + } + catch (Exception e) + { + _logger.warn("Exception occurred on message store deletion", e); + } + } + deleted(); + setState(State.DELETED); + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor() + ); - MessageStore ms = getMessageStore(); - if (ms != null) - { - try - { - ms.onDelete(this); - } - catch (Exception e) - { - _logger.warn("Exception occurred on message store deletion", e); - } - } - deleted(); - setState(State.DELETED); + return returnVal; + } + else + { + return Futures.immediateFuture(null); } } @@ -1532,7 +1626,7 @@ public abstract class AbstractVirtualHos } @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE ) - private void onActivate() + private ListenableFuture<Void> onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount(), new SuppressingInheritedAccessControlContextThreadFactory()); @@ -1552,9 +1646,28 @@ public abstract class AbstractVirtualHos if (isStoreEmpty()) { - createDefaultExchanges(); + final SettableFuture<Void> returnVal = SettableFuture.create(); + createDefaultExchanges().addListener(new Runnable() + { + @Override + public void run() + { + postCreateDefaultExchangeTasks(); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + return returnVal; } + else + { + postCreateDefaultExchangeTasks(); + return Futures.immediateFuture(null); + } + } + + private void postCreateDefaultExchangeTasks() + { if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); @@ -1589,9 +1702,32 @@ public abstract class AbstractVirtualHos scheduleHouseKeepingTask(getHousekeepingCheckPeriod(), _fileSystemSpaceChecker); } } + private static class ChildCounter + { + private final AtomicInteger _count = new AtomicInteger(); + private final Runnable _task; + + private ChildCounter(final Runnable task) + { + _task = task; + } + + public void incrementCount() + { + _count.incrementAndGet(); + } + + public void decrementCount() + { + if(_count.decrementAndGet() == 0) + { + _task.run(); + } + } + } @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - private void onRestart() + private ListenableFuture<Void> onRestart() { resetStatistics(); @@ -1622,6 +1758,25 @@ public abstract class AbstractVirtualHos new GenericRecoverer(this).recover(records); + final SettableFuture<Void> returnVal = SettableFuture.create(); + final ChildCounter counter = new ChildCounter(new Runnable() + { + @Override + public void run() + { + onActivate().addListener( + new Runnable() + { + @Override + public void run() + { + returnVal.set(null); + } + }, getTaskExecutor().getExecutor() + ); + } + }); + counter.incrementCount(); Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() { @Override @@ -1632,14 +1787,22 @@ public abstract class AbstractVirtualHos @Override public void performAction(final ConfiguredObject<?> object) { - object.open(); + counter.incrementCount(); + object.openAsync().addListener(new Runnable() + { + @Override + public void run() + { + counter.decrementCount(); + } + }, getTaskExecutor().getExecutor()); } }); return null; } }); - - onActivate(); + counter.decrementCount(); + return returnVal; } private class FileSystemSpaceChecker extends HouseKeepingTask Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java Mon Mar 9 17:12:14 2015 @@ -29,6 +29,8 @@ import java.util.Map; import javax.security.auth.Subject; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -68,7 +70,7 @@ public abstract class AbstractStandardVi } @Override - protected void activate() + protected ListenableFuture<Void> activate() { if (LOGGER.isDebugEnabled()) { @@ -107,15 +109,21 @@ public abstract class AbstractStandardVi if (host != null) { final VirtualHost<?,?,?> recoveredHost = host; - Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() - { - @Override - public Object run() - { - recoveredHost.open(); - return null; - } - }); + final ListenableFuture<Void> openFuture = Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), + new PrivilegedAction<ListenableFuture<Void>>() + { + @Override + public ListenableFuture<Void> run() + { + return recoveredHost.openAsync(); + + } + }); + return openFuture; + } + else + { + return Futures.immediateFuture(null); } } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java Mon Mar 9 17:12:14 2015 @@ -38,7 +38,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; @@ -122,16 +125,47 @@ public abstract class AbstractVirtualHos } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { + final SettableFuture<Void> returnVal = SettableFuture.create(); + try { - activate(); - setState(State.ACTIVE); + Futures.addCallback(activate(), + new FutureCallback<Void>() + { + @Override + public void onSuccess(final Void result) + { + try + { + setState(State.ACTIVE); + } + finally + { + returnVal.set(null); + } + + } + + @Override + public void onFailure(final Throwable t) + { + + setState(State.ERRORED); + returnVal.set(null); + if (_broker.isManagementMode()) + { + LOGGER.warn("Failed to make " + this + " active.", t); + } + } + }, getTaskExecutor().getExecutor() + ); } catch(RuntimeException e) { setState(State.ERRORED); + returnVal.set(null); if (_broker.isManagementMode()) { LOGGER.warn("Failed to make " + this + " active.", e); @@ -141,6 +175,7 @@ public abstract class AbstractVirtualHos throw e; } } + return returnVal; } @Override @@ -183,40 +218,73 @@ public abstract class AbstractVirtualHos } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { + final SettableFuture<Void> returnVal = SettableFuture.create(); setState(State.DELETED); deleteVirtualHostIfExists(); - final ListenableFuture<Void> closeFuture = close(); - deleted(); - DurableConfigurationStore configurationStore = getConfigurationStore(); - if (configurationStore != null) + final ListenableFuture<Void> closeFuture = closeAsync(); + closeFuture.addListener(new Runnable() { - configurationStore.onDelete(this); - } + @Override + public void run() + { + try + { + deleted(); + DurableConfigurationStore configurationStore = getConfigurationStore(); + if (configurationStore != null) + { + configurationStore.onDelete(AbstractVirtualHostNode.this); + } + } + finally + { + returnVal.set(null); + } + } + }, getTaskExecutor().getExecutor()); + + return returnVal; + } - protected void deleteVirtualHostIfExists() + protected ListenableFuture<Void> deleteVirtualHostIfExists() { VirtualHost<?, ?, ?> virtualHost = getVirtualHost(); if (virtualHost != null) { - virtualHost.delete(); + return virtualHost.deleteAsync(); + } + else + { + return Futures.immediateFuture(null); } } @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - stopAndSetStateTo(State.STOPPED); + return stopAndSetStateTo(State.STOPPED); } - protected void stopAndSetStateTo(State stoppedState) + protected ListenableFuture<Void> stopAndSetStateTo(final State stoppedState) { - // TODO - deal with async close children - closeChildren(); - closeConfigurationStoreSafely(); - setState(stoppedState); + final SettableFuture<Void> returnVal = SettableFuture.create(); + + ListenableFuture<Void> childCloseFuture = closeChildren(); + childCloseFuture.addListener(new Runnable() + { + @Override + public void run() + { + closeConfigurationStoreSafely(); + setState(stoppedState); + returnVal.set(null); + } + }, getTaskExecutor().getExecutor()); + + return returnVal; } @Override @@ -311,7 +379,7 @@ public abstract class AbstractVirtualHos protected abstract DurableConfigurationStore createConfigurationStore(); - protected abstract void activate(); + protected abstract ListenableFuture<Void> activate(); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java Mon Mar 9 17:12:14 2015 @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +65,7 @@ public class RedirectingVirtualHostNodeI } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) - protected void doActivate() + protected ListenableFuture<Void> doActivate() { try { @@ -83,6 +85,7 @@ public class RedirectingVirtualHostNodeI throw e; } } + return Futures.immediateFuture(null); } @Override Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Mon Mar 9 17:12:14 2015 @@ -151,6 +151,7 @@ public class HeadersBindingTest extends _count++; _queue = mock(AMQQueue.class); TaskExecutor executor = new CurrentThreadTaskExecutor(); + executor.start(); VirtualHostImpl vhost = mock(VirtualHostImpl.class); when(_queue.getVirtualHost()).thenReturn(vhost); when(_queue.getModel()).thenReturn(BrokerModel.getInstance()); @@ -158,6 +159,7 @@ public class HeadersBindingTest extends when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); final EventLogger eventLogger = new EventLogger(); when(vhost.getEventLogger()).thenReturn(eventLogger); + when(vhost.getTaskExecutor()).thenReturn(executor); _exchange = mock(ExchangeImpl.class); when(_exchange.getType()).thenReturn(ExchangeDefaults.HEADERS_EXCHANGE_CLASS); when(_exchange.getEventLogger()).thenReturn(eventLogger); Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Mon Mar 9 17:12:14 2015 @@ -22,6 +22,7 @@ package org.apache.qpid.server.model; import static java.util.Arrays.asList; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; @@ -33,12 +34,15 @@ import static org.mockito.Mockito.verify import static org.mockito.Mockito.when; import java.security.AccessControlException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -55,6 +59,7 @@ import org.apache.qpid.server.security.a import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -413,7 +418,30 @@ public class VirtualHostTest extends Qpi private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost) { final AMQConnectionModel connection = mock(AMQConnectionModel.class); + final List<Action<?>> tasks = new ArrayList<>(); + final ArgumentCaptor<Action> deleteTaskCaptor = ArgumentCaptor.forClass(Action.class); + Answer answer = new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + return tasks.add(deleteTaskCaptor.getValue()); + } + }; + doAnswer(answer).when(connection).addDeleteTask(deleteTaskCaptor.capture()); when(connection.getVirtualHost()).thenReturn(virtualHost); + doAnswer(new Answer() + { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable + { + for(Action action : tasks) + { + action.performAction(connection); + } + return null; + } + }).when(connection).closeAsync(any(AMQConstant.class),anyString()); when(connection.getRemoteAddressString()).thenReturn("peer:1234"); return connection; } Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java Mon Mar 9 17:12:14 2015 @@ -26,6 +26,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -127,15 +130,17 @@ public class TestConfiguredObject extend } @StateTransition( currentState = {State.ERRORED, State.UNINITIALIZED}, desiredState = State.ACTIVE ) - protected void activate() + protected ListenableFuture<Void> activate() { setState(State.ACTIVE); + return Futures.immediateFuture(null); } @StateTransition( currentState = {State.ERRORED, State.UNINITIALIZED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { setState(State.DELETED); + return Futures.immediateFuture(null); } public boolean isOpened() Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1665306&r1=1665305&r2=1665306&view=diff ============================================================================== --- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original) +++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Mon Mar 9 17:12:14 2015 @@ -31,14 +31,12 @@ import static org.mockito.Mockito.verify import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import org.apache.log4j.Logger; import org.mockito.ArgumentCaptor; @@ -56,7 +54,6 @@ import org.apache.qpid.server.message.Me import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.UUIDGenerator; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org