Author: orudyy
Date: Mon Jul 21 15:42:32 2014
New Revision: 1612322
URL: http://svn.apache.org/r1612322
Log:
QPID-5909: Allow setting of BDB HA message store durability many times
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1612322&r1=1612321&r2=1612322&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Mon Jul 21 15:42:32 2014
@@ -163,13 +163,13 @@ public class ReplicatedEnvironmentFacade
private final AtomicReference<ReplicationGroupListener>
_replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener =
new AtomicReference<StateChangeListener>();
private final Durability _defaultDurability;
- private final AtomicReference<Durability> _messageStoreDurability = new
AtomicReference<Durability>();
private volatile Durability _realMessageStoreDurability = null;
private volatile CoalescingCommiter _coalescingCommiter = null;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
+ private volatile Durability _messageStoreDurability;
private final ConcurrentHashMap<String, Database> _cachedDatabases = new
ConcurrentHashMap<>();
private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences
= new ConcurrentHashMap<>();
@@ -208,7 +208,7 @@ public class ReplicatedEnvironmentFacade
@Override
public Transaction beginTransaction()
{
- if (_messageStoreDurability.get() == null)
+ if (_messageStoreDurability == null)
{
throw new IllegalStateException("Message store durability is not
set");
}
@@ -240,7 +240,7 @@ public class ReplicatedEnvironmentFacade
}
if (_coalescingCommiter != null &&
_realMessageStoreDurability.getLocalSync() == SyncPolicy.NO_SYNC
- && _messageStoreDurability.get().getLocalSync() ==
SyncPolicy.SYNC)
+ && _messageStoreDurability.getLocalSync() == SyncPolicy.SYNC)
{
return _coalescingCommiter.commit(tx, syncCommit);
}
@@ -545,7 +545,7 @@ public class ReplicatedEnvironmentFacade
public Durability getMessageStoreDurability()
{
- return _messageStoreDurability.get();
+ return _messageStoreDurability;
}
public boolean isCoalescingSync()
@@ -1105,10 +1105,24 @@ public class ReplicatedEnvironmentFacade
}
}
+ /**
+ * This method should only be invoked from configuration thread on virtual
host activation.
+ * Otherwise, invocation of this method whilst coalescing committer is
committing transactions might result in transaction aborts.
+ */
public void setMessageStoreDurability(SyncPolicy
localTransactionSynchronizationPolicy, SyncPolicy
remoteTransactionSynchronizationPolicy, ReplicaAckPolicy
replicaAcknowledgmentPolicy)
{
- if (_messageStoreDurability.compareAndSet(null, new
Durability(localTransactionSynchronizationPolicy,
remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy )))
+ if (_messageStoreDurability == null ||
localTransactionSynchronizationPolicy != _messageStoreDurability.getLocalSync()
+ || remoteTransactionSynchronizationPolicy !=
_messageStoreDurability.getReplicaSync()
+ || replicaAcknowledgmentPolicy !=
_messageStoreDurability.getReplicaAck())
{
+ _messageStoreDurability = new
Durability(localTransactionSynchronizationPolicy,
remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
+
+ if (_coalescingCommiter != null)
+ {
+ _coalescingCommiter.stop();
+ _coalescingCommiter = null;
+ }
+
if (localTransactionSynchronizationPolicy ==
LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
{
localTransactionSynchronizationPolicy = SyncPolicy.NO_SYNC;
@@ -1117,10 +1131,6 @@ public class ReplicatedEnvironmentFacade
}
_realMessageStoreDurability = new
Durability(localTransactionSynchronizationPolicy,
remoteTransactionSynchronizationPolicy, replicaAcknowledgmentPolicy);
}
- else
- {
- throw new IllegalStateException("Message store durability is
already set to " + _messageStoreDurability.get());
- }
}
public void setPermittedNodes(Collection<String> permittedNodes)
Modified:
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1612322&r1=1612321&r2=1612322&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
Mon Jul 21 15:42:32 2014
@@ -176,17 +176,13 @@ public class ReplicatedEnvironmentFacade
new Durability(Durability.SyncPolicy.NO_SYNC,
Durability.SyncPolicy.NO_SYNC, Durability.ReplicaAckPolicy.SIMPLE_MAJORITY),
master.getRealMessageStoreDurability());
assertEquals("Unexpected durability", TEST_DURABILITY,
master.getMessageStoreDurability());
- assertTrue("Unexpected coalescing syn", master.isCoalescingSync());
+ assertTrue("Unexpected coalescing sync", master.isCoalescingSync());
- try
- {
- master.setMessageStoreDurability(TEST_DURABILITY.getLocalSync(),
TEST_DURABILITY.getReplicaSync(), TEST_DURABILITY.getReplicaAck());
- fail("Cannot set message store durability twice");
- }
- catch(IllegalStateException e)
- {
- // pass
- }
+ master.setMessageStoreDurability(Durability.SyncPolicy.WRITE_NO_SYNC,
Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL);
+ assertEquals("Unexpected message store durability",
+ new Durability(Durability.SyncPolicy.WRITE_NO_SYNC,
Durability.SyncPolicy.SYNC, Durability.ReplicaAckPolicy.ALL),
+ master.getRealMessageStoreDurability());
+ assertFalse("Coalescing sync committer is still running",
master.isCoalescingSync());
}
public void testGetNodeState() throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]