Author: orudyy
Date: Mon Jul 21 15:42:32 2014
New Revision: 1612322

URL: http://svn.apache.org/r1612322
Log:
QPID-5909: Allow setting of BDB HA message store durability many times

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1612322&r1=1612321&r2=1612322&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 Mon Jul 21 15:42:32 2014
@@ -163,13 +163,13 @@ public class ReplicatedEnvironmentFacade
     private final AtomicReference<ReplicationGroupListener> 
_replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
     private final AtomicReference<StateChangeListener> _stateChangeListener = 
new AtomicReference<StateChangeListener>();
     private final Durability _defaultDurability;
-    private final AtomicReference<Durability> _messageStoreDurability = new 
AtomicReference<Durability>();
 
     private volatile Durability _realMessageStoreDurability = null;
     private volatile CoalescingCommiter _coalescingCommiter = null;
     private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
     private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+    private volatile Durability _messageStoreDurability;
 
     private final ConcurrentHashMap<String, Database> _cachedDatabases = new 
ConcurrentHashMap<>();
     private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences 
= new ConcurrentHashMap<>();
@@ -208,7 +208,7 @@ public class ReplicatedEnvironmentFacade
     @Override
     public Transaction beginTransaction()
     {
-        if (_messageStoreDurability.get() == null)
+        if (_messageStoreDurability == null)
         {
             throw new IllegalStateException("Message store durability is not 
set");
         }
@@ -240,7 +240,7 @@ public class ReplicatedEnvironmentFacade
         }
 
         if (_coalescingCommiter != null && 
_realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
-                && _messageStoreDurability.get().getLocalSync() == 
SyncPolicy.SYNC)
+                && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
         {
             return _coalescingCommiter.commit(tx, syncCommit);
         }
@@ -545,7 +545,7 @@ public class ReplicatedEnvironmentFacade
 
     public Durability getMessageStoreDurability()
     {
-        return _messageStoreDurability.get();
+        return _messageStoreDurability;
     }
 
     public boolean isCoalescingSync()
@@ -1105,10 +1105,24 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
+    /**
+     * This method should only be invoked from configuration thread on virtual 
host activation.
+     * Otherwise, invocation of this method whilst coalescing committer is 
committing transactions might result in transaction aborts.
+     */
     public void setMessageStoreDurability(SyncPolicy 
localTransactionSynchronizationPolicy, SyncPolicy 
remoteTransactionSynchronizationPolicy, ReplicaAckPolicy 
replicaAcknowledgmentPolicy)
     {
-        if (_messageStoreDurability.compareAndSet(null, new 
Durability(localTransactionSynchronizationPolicy, 
remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy )))
+        if (_messageStoreDurability == null || 
localTransactionSynchronizationPolicy != _messageStoreDurability.getLocalSync()
+                || remoteTransactionSynchronizationPolicy != 
_messageStoreDurability.getReplicaSync()
+                || replicaAcknowledgmentPolicy != 
_messageStoreDurability.getReplicaAck())
         {
+            _messageStoreDurability = new 
Durability(localTransactionSynchronizationPolicy, 
remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
+
+            if (_coalescingCommiter != null)
+            {
+                _coalescingCommiter.stop();
+                _coalescingCommiter = null;
+            }
+
             if (localTransactionSynchronizationPolicy == 
LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
             {
                 localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
@@ -1117,10 +1131,6 @@ public class ReplicatedEnvironmentFacade
             }
             _realMessageStoreDurability = new 
Durability(localTransactionSynchronizationPolicy, 
remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
         }
-        else
-        {
-            throw new IllegalStateException("Message store durability is 
already set to " + _messageStoreDurability.get());
-        }
     }
 
     public void setPermittedNodes(Collection<String> permittedNodes)

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1612322&r1=1612321&r2=1612322&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 Mon Jul 21 15:42:32 2014
@@ -176,17 +176,13 @@ public class ReplicatedEnvironmentFacade
                 new Durability(Durability.SyncPolicy.NO_SYNC, 
Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
                 master.getRealMessageStoreDurability());
         assertEquals("Unexpected durability", TEST_DURABILITY, 
master.getMessageStoreDurability());
-        assertTrue("Unexpected coalescing syn", master.isCoalescingSync());
+        assertTrue("Unexpected coalescing sync", master.isCoalescingSync());
 
-        try
-        {
-            master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(), 
TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
-            fail("Cannot set message store durability twice");
-        }
-        catch(IllegalStateException e)
-        {
-            // pass
-        }
+        master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC, 
Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL);
+        assertEquals("Unexpected message store durability",
+                new Durability(Durability.SyncPolicy.WRITE_NO_SYNC, 
Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL),
+                master.getRealMessageStoreDurability());
+        assertFalse("Coalescing sync committer is still running", 
master.isCoalescingSync());
     }
 
     public void testGetNodeState() throws Exception



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to