Author: kwall
Date: Tue Apr 29 08:29:28 2014
New Revision: 1590917

URL: http://svn.apache.org/r1590917
Log:
QPID-5715: [Java Broker]: Wire up the BDB HA VirtualHostNode to the 
ReplicatedEnvironmentFacade.

* Attributes priority, quorumOverride, designatedPrimary are exposed as 
read/write attributes.
* Attribute role is readable (to observe the current role of the node), and 
writable, to request a change in mastership.
* Attributes joinTime and lastKnownReplicationTransactionId are exposed as 
derived attributes.

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1590917&r1=1590916&r2=1590917&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 Tue Apr 29 08:29:28 2014
@@ -81,14 +81,17 @@ import com.sleepycat.je.utilint.VLSN;
 
 public class ReplicatedEnvironmentFacade implements EnvironmentFacade, 
StateChangeListener
 {
+    public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = 
"qpid.bdb.ha.master_transfer_interval";
     public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = 
"qpid.bdb.ha.db_ping_socket_timeout";
     public static final String REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME = 
"qpid.bdb.ha.remote_node_monitor_interval";
 
     private static final Logger LOGGER = 
Logger.getLogger(ReplicatedEnvironmentFacade.class);
 
+    private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60;
     private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
     private static final int DEFAULT_REMOTE_NODE_MONITOR_INTERVAL = 1000;
 
+    private static final int MASTER_TRANSFER_TIMEOUT = 
Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, 
DEFAULT_MASTER_TRANSFER_TIMEOUT);
     private static final int DB_PING_SOCKET_TIMEOUT = 
Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, 
DEFAULT_DB_PING_SOCKET_TIMEOUT);
     private static final int REMOTE_NODE_MONITOR_INTERVAL = 
Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, 
DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
 
@@ -145,14 +148,13 @@ public class ReplicatedEnvironmentFacade
     private final AtomicReference<State> _state = new 
AtomicReference<State>(State.OPENING);
     private final ConcurrentMap<String, DatabaseHolder> _databases = new 
ConcurrentHashMap<String, DatabaseHolder>();
     private final AtomicReference<StateChangeListener> _stateChangeListener = 
new AtomicReference<StateChangeListener>();
+    private final AtomicBoolean _initialised;
+    private final EnvironmentFacadeTask[] _initialisationTasks;
 
     private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
     private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
 
-    private AtomicBoolean _initialised;
-    private EnvironmentFacadeTask[] _initialisationTasks;
-
     public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration 
configuration, EnvironmentFacadeTask[] initialisationTasks)
     {
         _environmentDirectory = new File(configuration.getStorePath());
@@ -214,8 +216,14 @@ public class ReplicatedEnvironmentFacade
                 shutdownAndAwaitExecutorService(_environmentJobExecutor);
                 shutdownAndAwaitExecutorService(_groupChangeExecutor);
 
-                closeDatabases();
-                closeEnvironment();
+                try
+                {
+                    closeDatabases();
+                }
+                finally
+                {
+                    closeEnvironment();
+                }
             }
             finally
             {
@@ -634,10 +642,52 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
+    public Future<Void> transferMasterToSelfAsynchronously()
+    {
+        final String nodeName = getNodeName();
+        return transferMasterAsynchronously(nodeName);
+    }
+
+    public Future<Void> transferMasterAsynchronously(final String nodeName)
+    {
+        return _groupChangeExecutor.submit(new Callable<Void>()
+        {
+            @Override
+            public Void call() throws Exception
+            {
+                try
+                {
+                    ReplicationGroupAdmin admin = 
createReplicationGroupAdmin();
+                    String newMaster = 
admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, 
TimeUnit.MILLISECONDS, true);
+                    if (LOGGER.isDebugEnabled())
+                    {
+                        LOGGER.debug("The mastership has been transfered to " 
+ newMaster);
+                    }
+                }
+                catch (DatabaseException e)
+                {
+                    LOGGER.warn("Exception on transfering the mastership to " 
+ _prettyGroupNodeName
+                            + " Master transfer timeout : " + 
MASTER_TRANSFER_TIMEOUT, e);
+                    throw e;
+                }
+                return null;
+            }
+        });
+    }
+
+    public void removeNodeFromGroup(final String nodeName)
+    {
+        createReplicationGroupAdmin().removeMember(nodeName);
+    }
+
+    public void updateAddress(final String nodeName, final String newHostName, 
final int newPort)
+    {
+        createReplicationGroupAdmin().updateAddress(nodeName, newHostName, 
newPort);
+    }
 
     public long getJoinTime()
     {
-        return _joinTime ;
+        return _joinTime;
     }
 
     public long getLastKnownReplicationTransactionId()
@@ -669,16 +719,6 @@ public class ReplicatedEnvironmentFacade
         return members;
     }
 
-    public void removeNodeFromGroup(final String nodeName)
-    {
-        createReplicationGroupAdmin().removeMember(nodeName);
-    }
-
-    public void updateAddress(final String nodeName, final String newHostName, 
final int newPort)
-    {
-        createReplicationGroupAdmin().updateAddress(nodeName, newHostName, 
newPort);
-    }
-
     private ReplicationGroupAdmin createReplicationGroupAdmin()
     {
         final Set<InetSocketAddress> helpers = new 
HashSet<InetSocketAddress>();
@@ -690,7 +730,6 @@ public class ReplicatedEnvironmentFacade
         return new ReplicationGroupAdmin(_configuration.getGroupName(), 
helpers);
     }
 
-
     public ReplicatedEnvironment getEnvironment()
     {
         return _environment;

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java?rev=1590917&r1=1590916&r2=1590917&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
 Tue Apr 29 08:29:28 2014
@@ -34,7 +34,10 @@ public interface BDBHAVirtualHostNode<X 
     public static final String DESIGNATED_PRIMARY = "designatedPrimary";
     public static final String PRIORITY = "priority";
     public static final String QUORUM_OVERRIDE = "quorumOverride";
+    public static final String ROLE = "role";
     public static final String REPLICATED_ENVIRONMENT_CONFIGURATION = 
"replicatedEnvironmentConfiguration";
+    public static final String LAST_KNOWN_REPLICATION_TRANSACTION_ID = 
"lastKnownReplicationTransactionId";
+    public static final String JOIN_TIME = "joinTime";
 
     @ManagedAttribute(automate = true, mandatory=true)
     String getGroupName();
@@ -61,5 +64,14 @@ public interface BDBHAVirtualHostNode<X 
     int getQuorumOverride();
 
     @ManagedAttribute(automate = true)
+    String getRole();
+
+    @ManagedAttribute(automate = true)
     Map<String, String> getReplicatedEnvironmentConfiguration();
+
+    @ManagedAttribute(derived = true)
+    Long getLastKnownReplicationTransactionId();
+
+    @ManagedAttribute(derived = true)
+    Long getJoinTime();
 }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1590917&r1=1590916&r2=1590917&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
 Tue Apr 29 08:29:28 2014
@@ -23,13 +23,16 @@ package org.apache.qpid.server.virtualho
 import java.security.PrivilegedAction;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.auth.Subject;
 
-import com.sleepycat.je.rep.StateChangeEvent;
-import com.sleepycat.je.rep.StateChangeListener;
 import org.apache.log4j.Logger;
-
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.BrokerModel;
@@ -42,20 +45,34 @@ import org.apache.qpid.server.model.Virt
 import org.apache.qpid.server.model.VirtualHostNode;
 import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
 import org.apache.qpid.server.store.berkeleydb.BDBHAVirtualHost;
 import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
 import 
org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
 import 
org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostState;
 import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
 
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
 @ManagedObject( category = false, type = "BDB_HA" )
 public class BDBHAVirtualHostNodeImpl extends 
AbstractVirtualHostNode<BDBHAVirtualHostNodeImpl> implements 
BDBHAVirtualHostNode<BDBHAVirtualHostNodeImpl>
 {
+    /**
+     * Length of time we synchronously await the a JE mutation to complete.  
It is not considered an error if we exceed this timeout, although a
+     * a warning will be logged.
+     */
+    private static final int MUTATE_JE_TIMEOUT_MS = 100;
+
     private static final Logger LOGGER = 
Logger.getLogger(BDBHAVirtualHostNodeImpl.class);
 
+    private final AtomicReference<ReplicatedEnvironmentFacade> 
_environmentFacade = new AtomicReference<>();
+
     @ManagedAttributeField
     private Map<String, String> _environmentConfiguration;
 
@@ -77,18 +94,22 @@ public class BDBHAVirtualHostNodeImpl ex
     @ManagedAttributeField
     private boolean _coalescingSync;
 
-    @ManagedAttributeField
+    @ManagedAttributeField(afterSet="postSetDesignatedPrimary")
     private boolean _designatedPrimary;
 
-    @ManagedAttributeField
+    @ManagedAttributeField(afterSet="postSetPriority")
     private int _priority;
 
-    @ManagedAttributeField
+    @ManagedAttributeField(afterSet="postSetQuorumOverride")
     private int _quorumOverride;
 
+    @ManagedAttributeField(beforeSet="preSetRole", afterSet="postSetRole")
+    private String _role;
+
     @ManagedAttributeField
     private Map<String, String> _replicatedEnvironmentConfiguration;
 
+
     @ManagedObjectFactoryConstructor
     public BDBHAVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> 
broker)
     {
@@ -162,6 +183,39 @@ public class BDBHAVirtualHostNodeImpl ex
     }
 
     @Override
+    public String getRole()
+    {
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+        if (environmentFacade != null)
+        {
+            return environmentFacade.getNodeState();
+        }
+        return "UNKNOWN";
+    }
+
+    @Override
+    public Long getLastKnownReplicationTransactionId()
+    {
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+        if (environmentFacade != null)
+        {
+            return environmentFacade.getLastKnownReplicationTransactionId();
+        }
+        return -1L;
+    }
+
+    @Override
+    public Long getJoinTime()
+    {
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+        if (environmentFacade != null)
+        {
+            return environmentFacade.getJoinTime();
+        }
+        return -1L;
+    }
+
+    @Override
     public Map<String, String> getReplicatedEnvironmentConfiguration()
     {
         return _replicatedEnvironmentConfiguration;
@@ -171,7 +225,7 @@ public class BDBHAVirtualHostNodeImpl ex
     public String toString()
     {
         return "BDBHAVirtualHostNodeImpl [id=" + getId() + ", name=" + 
getName() + ", storePath=" + _storePath + ", groupName=" + _groupName + ", 
address=" + _address
-                + ", state=" + getState() + "]";
+                + ", state=" + getState() + ", priority=" + _priority + ", 
designatedPrimary=" + _designatedPrimary + ", designatedPrimary=" + 
_quorumOverride + "]";
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -223,9 +277,26 @@ public class BDBHAVirtualHostNodeImpl ex
         getEventLogger().message(getConfigurationStoreLogSubject(), 
ConfigStoreMessages.CREATED());
         getEventLogger().message(getConfigurationStoreLogSubject(), 
ConfigStoreMessages.STORE_LOCATION(getStorePath()));
 
-
         ReplicatedEnvironmentFacade environmentFacade = 
(ReplicatedEnvironmentFacade) getConfigurationStore().getEnvironmentFacade();
         environmentFacade.setStateChangeListener(new 
BDBHAMessageStoreStateChangeListener());
+        _environmentFacade.set(environmentFacade);
+    }
+
+    @Override
+    protected void stop()
+    {
+        try
+        {
+            super.stop();
+        }
+        finally
+        {
+            ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+            if (_environmentFacade.compareAndSet(environmentFacade, null))
+            {
+                environmentFacade.close();
+            }
+        }
     }
 
     private void onMaster()
@@ -348,9 +419,188 @@ public class BDBHAVirtualHostNodeImpl ex
         }
     }
 
-    private class ReplicaVirtualHost extends BDBHAVirtualHost
+    // used as post action by field _priority
+    @SuppressWarnings("unused")
+    private void postSetPriority()
+    {
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+        if (environmentFacade != null)
+        {
+            try
+            {
+                
environmentFacade.setPriority(_priority).get(MUTATE_JE_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Node priority changed. " + this);
+                }
+            }
+            catch (TimeoutException e)
+            {
+                LOGGER.warn("Change node priority did not complete within " + 
MUTATE_JE_TIMEOUT_MS + "ms. New value " + _priority + " will become effective 
once the JE task thread is free.");
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e)
+            {
+                throw new ServerScopedRuntimeException("Failed to set priority 
node to value " + _priority + " on " + this, e);
+            }
+        }
+    }
+
+    // used as post action by field _designatedPrimary
+    @SuppressWarnings("unused")
+    private void postSetDesignatedPrimary()
+    {
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+        if (environmentFacade != null)
+        {
+            try
+            {
+                
environmentFacade.setDesignatedPrimary(_designatedPrimary).get(MUTATE_JE_TIMEOUT_MS,
 TimeUnit.MILLISECONDS);
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Designated primary changed. " + this);
+                }
+            }
+            catch (TimeoutException e)
+            {
+                LOGGER.warn("Change designated primary did not complete within 
" + MUTATE_JE_TIMEOUT_MS + "ms. New value " + _designatedPrimary + " will 
become effective once the JE task thread is free.");
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e)
+            {
+                throw new ServerScopedRuntimeException("Failed to set 
designated primary to value " + _designatedPrimary + " on " + this, e);
+            }
+        }
+    }
+
+    // used as post action by field _quorumOverride
+    @SuppressWarnings("unused")
+    private void postSetQuorumOverride()
+    {
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+        if (environmentFacade != null)
+        {
+            try
+            {
+                
environmentFacade.setElectableGroupSizeOverride(_quorumOverride).get(MUTATE_JE_TIMEOUT_MS,
 TimeUnit.MILLISECONDS);
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Quorum override changed. " + this);
+                }
+            }
+            catch (TimeoutException e)
+            {
+                LOGGER.warn("Change quorum override did not complete within " 
+ MUTATE_JE_TIMEOUT_MS + "ms. New value " + _durability + " will become 
effective once the JE task thread is free.");
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e)
+            {
+                throw new ServerScopedRuntimeException("Failed to set quorum 
override to value " + _quorumOverride + " on " + this, e);
+            }
+        }
+    }
+
+    // used as pre action by field _role
+    @SuppressWarnings("unused")
+    private void preSetRole()
+    {
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+        if (environmentFacade != null)
+        {
+            String currentRole = environmentFacade.getNodeState();
+            if 
(!ReplicatedEnvironment.State.REPLICA.name().equals(currentRole))
+            {
+                 throw new IllegalConfigurationException("Cannot transfer 
mastership when node is not in a replica role."
+                         + "Current role is " + currentRole);
+             }
+        }
+        else
+        {
+            // Ignored
+        }
+    }
+
+    // used as post action by field _role
+    @SuppressWarnings("unused")
+    private void postSetRole()
+    {
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
+        if (environmentFacade != null)
+        {
+            try
+            {
+                
environmentFacade.transferMasterToSelfAsynchronously().get(MUTATE_JE_TIMEOUT_MS,
 TimeUnit.MILLISECONDS);
+                if (LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Requested master transfer to self. " + this);
+                }
+            }
+            catch (TimeoutException e)
+            {
+                LOGGER.warn("Transfer master did not complete within " + 
MUTATE_JE_TIMEOUT_MS + "ms. Node may still be elected master at a later time.");
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e)
+            {
+                throw new ServerScopedRuntimeException("Failed to transfer 
master to " + this, e);
+            }
+        }
+        else
+        {
+            // Ignored
+        }
+    }
+
+    // TODO - need a better way of suppressing the persistence of the role 
field.
+    @Override
+    public ConfiguredObjectRecord asObjectRecord()
     {
+        final ConfiguredObjectRecord underlying = super.asObjectRecord();
+        return new ConfiguredObjectRecord()
+        {
 
+            @Override
+            public String getType()
+            {
+                return underlying.getType();
+            }
+
+            @Override
+            public Map<String, ConfiguredObjectRecord> getParents()
+            {
+                return underlying.getParents();
+            }
+
+            @Override
+            public UUID getId()
+            {
+                return underlying.getId();
+            }
+
+            @Override
+            public Map<String, Object> getAttributes()
+            {
+                Map<String, Object> copy = new HashMap<String, 
Object>(underlying.getAttributes());
+                copy.remove(BDBHAVirtualHostNode.ROLE);
+                return copy;
+            }
+        };
+    }
+
+    private class ReplicaVirtualHost extends BDBHAVirtualHost
+    {
         ReplicaVirtualHost(Map<String, Object> attributes, VirtualHostNode<?> 
virtualHostNode)
         {
             super(attributes, virtualHostNode);
@@ -372,4 +622,5 @@ public class BDBHAVirtualHostNodeImpl ex
             return super.setState(currentState, desiredState);
         }
     }
+
 }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1590917&r1=1590916&r2=1590917&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
 Tue Apr 29 08:29:28 2014
@@ -25,14 +25,13 @@ import static org.mockito.Mockito.when;
 import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.ReplicationConfig;
-
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
 import org.apache.qpid.server.model.Broker;
@@ -43,20 +42,23 @@ import org.apache.qpid.server.model.Conf
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.util.FileUtils;
 
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+
 public class BDBHAVirtualHostNodeTest extends QpidTestCase
 {
 
     private Broker<?> _broker;
     private File _bdbStorePath;
-    private VirtualHostNode<?> _virtualHostNode;
     private TaskExecutor _taskExecutor;
+    private final ConfiguredObjectFactory _objectFactory = 
BrokerModel.getInstance().getObjectFactory();
+    private final Set<BDBHAVirtualHostNode<?>> _nodes = new HashSet<>();
 
     @Override
     protected void setUp() throws Exception
@@ -78,9 +80,24 @@ public class BDBHAVirtualHostNodeTest ex
     {
         try
         {
-            if (_virtualHostNode != null)
+            Exception firstException = null;
+            for (VirtualHostNode<?> node : _nodes)
             {
-                _virtualHostNode.setDesiredState(_virtualHostNode.getState(), 
State.STOPPED);
+                try
+                {
+                    node.setDesiredState(node.getState(), State.DELETED);
+                }
+                catch(Exception e)
+                {
+                    if (firstException != null)
+                    {
+                        firstException = e;
+                    }
+                }
+                if (firstException != null)
+                {
+                    throw firstException;
+                }
             }
         }
         finally
@@ -119,11 +136,7 @@ public class BDBHAVirtualHostNodeTest ex
         
attributes.put(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION,
                 Collections.singletonMap(ReplicationConfig.REP_STREAM_TIMEOUT, 
repStreamTimeout));
 
-        ConfiguredObjectFactory objectFactory = 
BrokerModel.getInstance().getObjectFactory();
-        ConfiguredObjectTypeFactory factory = 
objectFactory.getConfiguredObjectTypeFactory("VirtualHostNode",
-                                                                               
               "BDB_HA");
-
-        BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) 
factory.create(null, attributes, _broker);
+        VirtualHostNode<?> node = createHaVHN(attributes);
 
         final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1);
         final CountDownLatch virtualHostStateChangeLatch = new 
CountDownLatch(1);
@@ -168,6 +181,7 @@ public class BDBHAVirtualHostNodeTest ex
         assertEquals(groupName, environment.getGroup().getName());
         assertEquals(nodeHostPort, replicationConfig.getNodeHostPort());
         assertEquals(helperHostPort, replicationConfig.getHelperHosts());
+
         assertEquals(durability, 
environment.getConfig().getDurability().toString());
         assertEquals("Unexpected JE replication stream timeout", 
repStreamTimeout, 
replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
 
@@ -191,6 +205,129 @@ public class BDBHAVirtualHostNodeTest ex
         assertFalse("Store still exists", _bdbStorePath.exists());
     }
 
+    public void testMutableAttributes() throws Exception
+    {
+        UUID id = UUID.randomUUID();
+        String address = "localhost:" + findFreePort();
+
+        Map<String, Object> attributes = new HashMap<String, Object>();
+        attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+        attributes.put(BDBHAVirtualHostNode.ID, id);
+        attributes.put(BDBHAVirtualHostNode.NAME, "node");
+        attributes.put(BDBHAVirtualHostNode.GROUP_NAME, "group");
+        attributes.put(BDBHAVirtualHostNode.ADDRESS, address);
+        attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, address);
+        attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath);
+
+        BDBHAVirtualHostNode<?> node = createHaVHN(attributes);
+
+        assertEquals("Failed to activate node", State.ACTIVE, 
node.setDesiredState(node.getState(), State.ACTIVE));
+
+        BDBMessageStore bdbMessageStore = (BDBMessageStore) 
node.getConfigurationStore();
+        ReplicatedEnvironment environment = (ReplicatedEnvironment) 
bdbMessageStore.getEnvironmentFacade().getEnvironment();
+
+        assertEquals("Unexpected node priority value before mutation", 1, 
environment.getRepMutableConfig().getNodePriority());
+        assertFalse("Unexpected designated primary value before mutation", 
environment.getRepMutableConfig().getDesignatedPrimary());
+        assertEquals("Unexpected electable group override value before 
mutation", 0, 
environment.getRepMutableConfig().getElectableGroupSizeOverride());
+
+        node.setAttribute(BDBHAVirtualHostNode.PRIORITY, 1, 2);
+        node.setAttribute(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, false, 
true);
+        node.setAttribute(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 0, 1);
+
+        assertEquals("Unexpected node priority value after mutation", 2, 
environment.getRepMutableConfig().getNodePriority());
+        assertTrue("Unexpected designated primary value after mutation", 
environment.getRepMutableConfig().getDesignatedPrimary());
+        assertEquals("Unexpected electable group override value after 
mutation", 1, 
environment.getRepMutableConfig().getElectableGroupSizeOverride());
+
+        assertNotNull("Join time should be set", node.getJoinTime());
+        assertNotNull("Last known replication transaction idshould be set", 
node.getLastKnownReplicationTransactionId());
+
+    }
+
+    public void testTransferMasterToSelf() throws Exception
+    {
+        int node1PortNumber = findFreePort();
+        String helperAddress = "localhost:" + node1PortNumber;
+        String groupName = "group";
+
+        Map<String, Object> node1Attributes = new HashMap<String, Object>();
+        node1Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+        node1Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+        node1Attributes.put(BDBHAVirtualHostNode.NAME, "node1");
+        node1Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+        node1Attributes.put(BDBHAVirtualHostNode.ADDRESS, helperAddress);
+        node1Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, 
helperAddress);
+        node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + 
File.separator + "1");
+
+        BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes);
+        assertEquals("Failed to activate node", State.ACTIVE, 
node1.setDesiredState(node1.getState(), State.ACTIVE));
+
+        int node2PortNumber = getNextAvailable(node1PortNumber+1);
+
+        Map<String, Object> node2Attributes = new HashMap<String, Object>();
+        node2Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+        node2Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+        node2Attributes.put(BDBHAVirtualHostNode.NAME, "node2");
+        node2Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+        node2Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + 
node2PortNumber);
+        node2Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, 
helperAddress);
+        node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + 
File.separator + "2");
+
+        BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes);
+        assertEquals("Failed to activate node2", State.ACTIVE, 
node2.setDesiredState(node2.getState(), State.ACTIVE));
+
+        int node3PortNumber = getNextAvailable(node2PortNumber+1);
+        Map<String, Object> node3Attributes = new HashMap<String, Object>();
+        node3Attributes.put(BDBHAVirtualHostNode.ID, UUID.randomUUID());
+        node3Attributes.put(BDBHAVirtualHostNode.TYPE, "BDB_HA");
+        node3Attributes.put(BDBHAVirtualHostNode.NAME, "node3");
+        node3Attributes.put(BDBHAVirtualHostNode.GROUP_NAME, groupName);
+        node3Attributes.put(BDBHAVirtualHostNode.ADDRESS, "localhost:" + 
node3PortNumber);
+        node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, 
helperAddress);
+        node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + 
File.separator + "3");
+        BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes);
+        assertEquals("Failed to activate node3", State.ACTIVE, 
node3.setDesiredState(node3.getState(), State.ACTIVE));
+
+        BDBHAVirtualHostNode<?> replica = null;
+        int findReplicaCount = 0;
+        while(replica == null)
+        {
+            for (BDBHAVirtualHostNode<?> node : _nodes)
+            {
+                if ("REPLICA".equals(node.getRole()))
+                {
+                    replica = node;
+                    break;
+                }
+            }
+
+            Thread.sleep(100);
+            if (findReplicaCount > 20)
+            {
+                fail("Could not find a node is replica role");
+            }
+            findReplicaCount++;
+        }
+
+        replica.setAttribute(BDBHAVirtualHostNode.ROLE, "REPLICA", "MASTER");
+
+        int awaitMastershipCount = 0;
+        while(!"MASTER".equals(replica.getRole()))
+        {
+            Thread.sleep(100);
+            if (awaitMastershipCount > 20)
+            {
+                fail("Replica did not assume master role");
+            }
+            awaitMastershipCount++;
+        }
+    }
+
+    private BDBHAVirtualHostNode<?> createHaVHN(Map<String, Object> attributes)
+    {
+        BDBHAVirtualHostNode<?> node = (BDBHAVirtualHostNode<?>) 
_objectFactory.create(VirtualHostNode.class, attributes, _broker);
+        _nodes.add(node);
+        return node;
+    }
 }
 
 

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1590917&r1=1590916&r2=1590917&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 Tue Apr 29 08:29:28 2014
@@ -40,6 +40,7 @@ import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.Environment;
+import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.ReplicatedEnvironment.State;
 import com.sleepycat.je.rep.ReplicationConfig;
 import com.sleepycat.je.rep.StateChangeEvent;
@@ -274,6 +275,123 @@ public class ReplicatedEnvironmentFacade
         assertEquals("Unexpected state " + 
replicatedEnvironmentFacade.getFacadeState(), 
ReplicatedEnvironmentFacade.State.CLOSED, 
replicatedEnvironmentFacade.getFacadeState());
     }
 
+    public void testTransferMasterToSelf() throws Exception
+    {
+        final CountDownLatch firstNodeReplicaStateLatch = new 
CountDownLatch(1);
+        final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
+        StateChangeListener stateChangeListener = new StateChangeListener(){
+
+            @Override
+            public void stateChange(StateChangeEvent event) throws 
RuntimeException
+            {
+                ReplicatedEnvironment.State state = event.getState();
+                if (state == ReplicatedEnvironment.State.REPLICA)
+                {
+                    firstNodeReplicaStateLatch.countDown();
+                }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    firstNodeMasterStateLatch.countDown();
+                }
+            }
+        };
+        ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, 
stateChangeListener);
+        assertTrue("Environment did not become a master", 
firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
+
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String node1NodeHostPort = "localhost:" + replica1Port;
+        ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME 
+ "_1", node1NodeHostPort);
+        assertEquals("Unexpected state", 
ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
+
+        int replica2Port = getNextAvailable(replica1Port + 1);
+        String node2NodeHostPort = "localhost:" + replica2Port;
+        final CountDownLatch replicaStateLatch = new CountDownLatch(1);
+        final CountDownLatch masterStateLatch = new CountDownLatch(1);
+        StateChangeListener testStateChangeListener = new StateChangeListener()
+        {
+            @Override
+            public void stateChange(StateChangeEvent event) throws 
RuntimeException
+            {
+                ReplicatedEnvironment.State state = event.getState();
+                if (state == ReplicatedEnvironment.State.REPLICA)
+                {
+                    replicaStateLatch.countDown();
+                }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    masterStateLatch.countDown();
+                }
+            }
+        };
+        ReplicatedEnvironmentFacade thirdNode = addNode(TEST_NODE_NAME + "_2", 
node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, 
testStateChangeListener);
+        assertTrue("Environment did not become a replica", 
replicaStateLatch.await(10, TimeUnit.SECONDS));
+        assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
+
+        thirdNode.transferMasterToSelfAsynchronously();
+        assertTrue("Environment did not become a master", 
masterStateLatch.await(10, TimeUnit.SECONDS));
+        assertTrue("First node environment did not become a replica", 
firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
+        assertEquals("Unexpected state", 
ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
+    }
+
+    public void testTransferMasterAnotherNode() throws Exception
+    {
+        final CountDownLatch firstNodeReplicaStateLatch = new 
CountDownLatch(1);
+        final CountDownLatch firstNodeMasterStateLatch = new CountDownLatch(1);
+        StateChangeListener stateChangeListener = new StateChangeListener(){
+
+            @Override
+            public void stateChange(StateChangeEvent event) throws 
RuntimeException
+            {
+                ReplicatedEnvironment.State state = event.getState();
+                if (state == ReplicatedEnvironment.State.REPLICA)
+                {
+                    firstNodeReplicaStateLatch.countDown();
+                }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    firstNodeMasterStateLatch.countDown();
+                }
+            }
+        };
+        ReplicatedEnvironmentFacade firstNode = addNode(State.MASTER, 
stateChangeListener);
+        assertTrue("Environment did not become a master", 
firstNodeMasterStateLatch.await(10, TimeUnit.SECONDS));
+
+        int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
+        String node1NodeHostPort = "localhost:" + replica1Port;
+        ReplicatedEnvironmentFacade secondNode = createReplica(TEST_NODE_NAME 
+ "_1", node1NodeHostPort);
+        assertEquals("Unexpected state", 
ReplicatedEnvironment.State.REPLICA.name(), secondNode.getNodeState());
+
+        int replica2Port = getNextAvailable(replica1Port + 1);
+        String node2NodeHostPort = "localhost:" + replica2Port;
+        final CountDownLatch replicaStateLatch = new CountDownLatch(1);
+        final CountDownLatch masterStateLatch = new CountDownLatch(1);
+        StateChangeListener testStateChangeListener = new StateChangeListener()
+        {
+            @Override
+            public void stateChange(StateChangeEvent event) throws 
RuntimeException
+            {
+                ReplicatedEnvironment.State state = event.getState();
+                if (state == ReplicatedEnvironment.State.REPLICA)
+                {
+                    replicaStateLatch.countDown();
+                }
+                if (state == ReplicatedEnvironment.State.MASTER)
+                {
+                    masterStateLatch.countDown();
+                }
+            }
+        };
+        String thirdNodeName = TEST_NODE_NAME + "_2";
+        ReplicatedEnvironmentFacade thirdNode = addNode(thirdNodeName, 
node2NodeHostPort, TEST_DESIGNATED_PRIMARY, State.REPLICA, 
testStateChangeListener);
+        assertTrue("Environment did not become a replica", 
replicaStateLatch.await(10, TimeUnit.SECONDS));
+        assertEquals(3, thirdNode.getNumberOfElectableGroupMembers());
+
+        firstNode.transferMasterAsynchronously(thirdNodeName);
+        assertTrue("Environment did not become a master", 
masterStateLatch.await(10, TimeUnit.SECONDS));
+        assertTrue("First node environment did not become a replica", 
firstNodeReplicaStateLatch.await(10, TimeUnit.SECONDS));
+        assertEquals("Unexpected state", 
ReplicatedEnvironment.State.REPLICA.name(), firstNode.getNodeState());
+    }
+
     private ReplicatedEnvironmentFacade createMaster() throws Exception
     {
         TestStateChangeListener stateChangeListener = new 
TestStateChangeListener(State.MASTER);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to