Author: orudyy
Date: Tue Jun  3 08:25:16 2014
New Revision: 1599445

URL: http://svn.apache.org/r1599445
Log:
QPID-5715: Use coalescing sync committer for message store transactions only 
when syncronization policy SYNC is set for local transactions in BDB HA virtual 
host

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
    
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
 Tue Jun  3 08:25:16 2014
@@ -317,7 +317,7 @@ public class BDBMessageStore implements 
             {
                 if (_committer != null)
                 {
-                    _committer.stop();
+                    _committer.close();
                 }
             }
             finally
@@ -335,19 +335,9 @@ public class BDBMessageStore implements 
     {
         if (_configurationStoreOpen.compareAndSet(true, false))
         {
-            try
-            {
-                if (_committer != null)
-                {
-                    _committer.stop();
-                }
-            }
-            finally
+            if (!_messageStoreOpen.get())
             {
-                if (!_messageStoreOpen.get())
-                {
-                    closeEnvironment();
-                }
+                closeEnvironment();
             }
         }
     }
@@ -542,7 +532,7 @@ public class BDBMessageStore implements 
         com.sleepycat.je.Transaction txn = null;
         try
         {
-            txn = _environmentFacade.beginTransaction();
+            txn = _environmentFacade.getEnvironment().beginTransaction(null, 
null);
             storeConfiguredObjectEntry(txn, configuredObject);
             txn.commit();
             txn = null;
@@ -569,7 +559,7 @@ public class BDBMessageStore implements 
         com.sleepycat.je.Transaction txn = null;
         try
         {
-            txn = _environmentFacade.beginTransaction();
+            txn = _environmentFacade.getEnvironment().beginTransaction(null, 
null);
 
             Collection<UUID> removed = new ArrayList<UUID>(objects.length);
             for(ConfiguredObjectRecord record : objects)
@@ -606,7 +596,7 @@ public class BDBMessageStore implements 
         com.sleepycat.je.Transaction txn = null;
         try
         {
-            txn = _environmentFacade.beginTransaction();
+            txn = _environmentFacade.getEnvironment().beginTransaction(null, 
null);
             for(ConfiguredObjectRecord record : records)
             {
                 update(createIfNecessary, record, txn);

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
 Tue Jun  3 08:25:16 2014
@@ -22,7 +22,11 @@ package org.apache.qpid.server.store.ber
 
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.store.StoreFuture;
@@ -33,55 +37,94 @@ import com.sleepycat.je.Transaction;
 
 public class CoalescingCommiter implements Committer
 {
-    private final CommitThread _commitThread;
+    private final CommitTask _commitTask;
+    private final AtomicReference<Boolean> _started;
+    private final ExecutorService _taskExecutor;
 
-    public CoalescingCommiter(String name, EnvironmentFacade environmentFacade)
+    public CoalescingCommiter(final String name, EnvironmentFacade 
environmentFacade)
     {
-        _commitThread = new CommitThread("Commit-Thread-" + name, 
environmentFacade);
+        _started = new AtomicReference<Boolean>(false);
+        _commitTask = new CommitTask(environmentFacade);
+        _taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
+        {
+            @Override
+            public Thread newThread(Runnable r)
+            {
+                return new Thread(r, "Commit-Thread-" + name);
+            }
+        });
     }
 
     @Override
     public void start()
     {
-        _commitThread.start();
+        if (_started.compareAndSet(false, true))
+        {
+            _taskExecutor.submit(_commitTask);
+        }
     }
 
     @Override
     public void stop()
     {
-        _commitThread.close();
+        if (_started.compareAndSet(true, false))
+        {
+            _commitTask.stop();
+        }
+    }
+
+    @Override
+    public void close()
+    {
         try
         {
-            _commitThread.join();
+            _started.set(false);
+            _commitTask.close();
         }
-        catch (InterruptedException ie)
+        finally
         {
-            Thread.currentThread().interrupt();
-            throw new RuntimeException("Commit thread has not shutdown", ie);
+            _taskExecutor.shutdown();
         }
     }
 
     @Override
     public StoreFuture commit(Transaction tx, boolean syncCommit)
     {
-        BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, 
syncCommit);
-        commitFuture.commit();
-        return commitFuture;
+        if (_started.get())
+        {
+            BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, 
tx, syncCommit);
+            try
+            {
+                commitFuture.commit();
+                return commitFuture;
+            }
+            catch(IllegalStateException e)
+            {
+                // IllegalStateException is thrown when commit thread is 
stopped whilst commit is called
+            }
+        }
+
+        return StoreFuture.IMMEDIATE_FUTURE;
+    }
+
+    public boolean isStarted()
+    {
+        return _started.get();
     }
 
     private static final class BDBCommitFuture implements StoreFuture
     {
         private static final Logger LOGGER = 
Logger.getLogger(BDBCommitFuture.class);
 
-        private final CommitThread _commitThread;
+        private final CommitTask _commitTask;
         private final Transaction _tx;
         private final boolean _syncCommit;
         private RuntimeException _databaseException;
         private boolean _complete;
 
-        public BDBCommitFuture(CommitThread commitThread, Transaction tx, 
boolean syncCommit)
+        public BDBCommitFuture(CommitTask commitTask, Transaction tx, boolean 
syncCommit)
         {
-            _commitThread = commitThread;
+            _commitTask = commitTask;
             _tx = tx;
             _syncCommit = syncCommit;
         }
@@ -107,7 +150,7 @@ public class CoalescingCommiter implemen
 
         public void commit() throws DatabaseException
         {
-            _commitThread.addJob(this, _syncCommit);
+            _commitTask.addJob(this, _syncCommit);
 
             if(!_syncCommit)
             {
@@ -142,7 +185,12 @@ public class CoalescingCommiter implemen
 
             while (!isComplete())
             {
-                _commitThread.explicitNotify();
+                if (_commitTask.isStopped())
+                {
+                    throw new IllegalStateException("Commit thread is 
stopped");
+                }
+
+                _commitTask.explicitNotify();
                 try
                 {
                     wait(250);
@@ -162,28 +210,32 @@ public class CoalescingCommiter implemen
     }
 
     /**
-     * Implements a thread which batches and commits a queue of {@link 
BDBCommitFuture} operations. The commit operations
+     * Implements a {@link Runnable} which batches and commits a queue of 
{@link BDBCommitFuture} operations. The commit operations
      * themselves are responsible for adding themselves to the queue and 
waiting for the commit to happen before
      * continuing, but it is the responsibility of this thread to tell the 
commit operations when they have been
      * completed by calling back on their {@link BDBCommitFuture#complete()} 
and {@link BDBCommitFuture#abort} methods.
      *
      * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> 
Responsibilities <th> Collaborations </table>
      */
-    private static class CommitThread extends Thread
+    private static class CommitTask implements Runnable
     {
-        private static final Logger LOGGER = 
Logger.getLogger(CommitThread.class);
+        private static final Logger LOGGER = 
Logger.getLogger(CommitTask.class);
 
-        private final AtomicBoolean _stopped = new AtomicBoolean(false);
+        private final AtomicBoolean _stopped = new AtomicBoolean(true);
         private final Queue<BDBCommitFuture> _jobQueue = new 
ConcurrentLinkedQueue<BDBCommitFuture>();
         private final Object _lock = new Object();
         private final EnvironmentFacade _environmentFacade;
 
-        public CommitThread(String name, EnvironmentFacade environmentFacade)
+        public CommitTask(EnvironmentFacade environmentFacade)
         {
-            super(name);
             _environmentFacade = environmentFacade;
         }
 
+        public boolean isStopped()
+        {
+            return _stopped.get();
+        }
+
         public void explicitNotify()
         {
             synchronized (_lock)
@@ -192,26 +244,30 @@ public class CoalescingCommiter implemen
             }
         }
 
+        @Override
         public void run()
         {
-            while (!_stopped.get())
+            if (_stopped.compareAndSet(true, false))
             {
-                synchronized (_lock)
+                while (!_stopped.get())
                 {
-                    while (!_stopped.get() && !hasJobs())
+                    synchronized (_lock)
                     {
-                        try
-                        {
-                            // Periodically wake up and check, just in case we
-                            // missed a notification. Don't want to lock the 
broker hard.
-                            _lock.wait(1000);
-                        }
-                        catch (InterruptedException e)
+                        while (!_stopped.get() && !hasJobs())
                         {
+                            try
+                            {
+                                // Periodically wake up and check, just in 
case we
+                                // missed a notification. Don't want to lock 
the broker hard.
+                                _lock.wait(1000);
+                            }
+                            catch (InterruptedException e)
+                            {
+                            }
                         }
                     }
+                    processJobs();
                 }
-                processJobs();
             }
         }
 
@@ -303,9 +359,20 @@ public class CoalescingCommiter implemen
             }
         }
 
+        public void stop()
+        {
+            synchronized (_lock)
+            {
+                if (_stopped.compareAndSet(false, true) || hasJobs())
+                {
+                    processJobs();
+                }
+            }
+        }
+
         public void close()
         {
-            RuntimeException e = new RuntimeException("Commit thread has been 
closed, transaction aborted");
+            RuntimeException e = new RuntimeException("Commit thread has been 
stopped");
             synchronized (_lock)
             {
                 _stopped.set(true);

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
 Tue Jun  3 08:25:16 2014
@@ -32,6 +32,10 @@ public interface Committer
 
     void stop();
 
+    void close();
+
+    boolean isStarted();
+
     Committer IMMEDIATE_FUTURE_COMMITTER = new Committer()
     {
 
@@ -50,6 +54,17 @@ public interface Committer
         public void stop()
         {
         }
+
+        @Override
+        public void close()
+        {
+        }
+
+        @Override
+        public boolean isStarted()
+        {
+            return true;
+        }
     };
 
 }
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
 Tue Jun  3 08:25:16 2014
@@ -29,8 +29,6 @@ public interface ReplicatedEnvironmentCo
     String getGroupName();
     String getHostPort();
     String getHelperHostPort();
-    String getDurability();
-    boolean isCoalescingSync();
     boolean isDesignatedPrimary();
     int getPriority();
     int getQuorumOverride();

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 Tue Jun  3 08:25:16 2014
@@ -99,8 +99,9 @@ public class ReplicatedEnvironmentFacade
     private static final int REMOTE_NODE_MONITOR_INTERVAL = 
Integer.getInteger(REMOTE_NODE_MONITOR_INTERVAL_PROPERTY_NAME, 
DEFAULT_REMOTE_NODE_MONITOR_INTERVAL);
     private static final int RESTART_TRY_LIMIT = 3;
 
-    static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = 
SyncPolicy.NO_SYNC;
+    static final SyncPolicy LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = 
SyncPolicy.SYNC;
     static final SyncPolicy REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = 
SyncPolicy.NO_SYNC;
+    static final ReplicaAckPolicy REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY = 
ReplicaAckPolicy.SIMPLE_MAJORITY;
 
     @SuppressWarnings("serial")
     private static final Map<String, String> REPCONFIG_DEFAULTS = 
Collections.unmodifiableMap(new HashMap<String, String>()
@@ -153,12 +154,14 @@ public class ReplicatedEnvironmentFacade
     private final AtomicReference<StateChangeListener> _stateChangeListener = 
new AtomicReference<StateChangeListener>();
     private final AtomicBoolean _initialised;
     private final EnvironmentFacadeTask[] _initialisationTasks;
+    private final Durability _defaultDurability;
+    private final CoalescingCommiter _coalescingCommiter;
 
     private volatile ReplicatedEnvironment _environment;
     private volatile long _joinTime;
     private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
-    private volatile SyncPolicy _localTransactionSyncronizationPolicy = 
LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
-    private volatile SyncPolicy _remoteTransactionSyncronizationPolicy = 
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
+    private volatile SyncPolicy 
_messageStoreLocalTransactionSyncronizationPolicy = 
LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
+    private volatile SyncPolicy 
_messageStoreRemoteTransactionSyncronizationPolicy = 
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
 
     public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration 
configuration, EnvironmentFacadeTask[] initialisationTasks)
     {
@@ -175,13 +178,15 @@ public class ReplicatedEnvironmentFacade
         _initialised = new AtomicBoolean();
         _initialisationTasks = initialisationTasks;
         _configuration = configuration;
-
+        _defaultDurability = new 
Durability(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, 
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, 
REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY);
         _prettyGroupNodeName = _configuration.getGroupName() + ":" + 
_configuration.getName();
 
         // we relay on this executor being single-threaded as we need to 
restart and mutate the environment in one thread
         _environmentJobExecutor = Executors.newSingleThreadExecutor(new 
DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
         _groupChangeExecutor = 
Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 
1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName));
 
+        _coalescingCommiter  = new 
CoalescingCommiter(_configuration.getGroupName(), this);
+
         // create environment in a separate thread to avoid renaming of the 
current thread by JE
         _environment = createEnvironment(true);
         populateExistingRemoteReplicationNodes();
@@ -193,9 +198,8 @@ public class ReplicatedEnvironmentFacade
     {
         try
         {
-            Durability durability = getDurability();
             TransactionConfig transactionConfig = new TransactionConfig();
-            transactionConfig.setDurability(durability);
+            
transactionConfig.setDurability(getMessageStoreTransactionDurability());
             return _environment.beginTransaction(null, transactionConfig);
         }
         catch(DatabaseException e)
@@ -511,16 +515,26 @@ public class ReplicatedEnvironmentFacade
         return (String)_configuration.getHelperHostPort();
     }
 
+    Durability getMessageStoreTransactionDurability()
+    {
+        SyncPolicy localSync = 
getMessageStoreLocalTransactionSyncronizationPolicy();
+        if ( localSync == LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY && 
_coalescingCommiter.isStarted())
+        {
+            localSync = SyncPolicy.NO_SYNC;
+        }
+        SyncPolicy replicaSync = 
getMessageStoreRemoteTransactionSyncronizationPolicy();
+        return new Durability(localSync, replicaSync, 
getReplicaAcknowledgmentPolicy());
+    }
+
     public Durability getDurability()
     {
-        SyncPolicy localSync = getLocalTransactionSyncronizationPolicy();
-        SyncPolicy replicaSync = getRemoteTransactionSyncronizationPolicy();
-        return new Durability(localSync, replicaSync, 
ReplicaAckPolicy.SIMPLE_MAJORITY);
+        return new 
Durability(getMessageStoreLocalTransactionSyncronizationPolicy(),
+                getMessageStoreRemoteTransactionSyncronizationPolicy(), 
getReplicaAcknowledgmentPolicy());
     }
 
     public boolean isCoalescingSync()
     {
-        return true;
+        return _coalescingCommiter.isStarted();
     }
 
     public String getNodeState()
@@ -905,7 +919,6 @@ public class ReplicatedEnvironmentFacade
         boolean designatedPrimary = _configuration.isDesignatedPrimary();
         int priority = _configuration.getPriority();
         int quorumOverride = _configuration.getQuorumOverride();
-        Durability durability = getDurability();
 
         if (LOGGER.isInfoEnabled())
         {
@@ -915,7 +928,7 @@ public class ReplicatedEnvironmentFacade
             LOGGER.info("Node name " + _configuration.getName());
             LOGGER.info("Node host port " + hostPort);
             LOGGER.info("Helper host port " + helperHostPort);
-            LOGGER.info("Durability " + durability);
+            LOGGER.info("Durability " + _defaultDurability);
             LOGGER.info("Designated primary (applicable to 2 node case only) " 
+ designatedPrimary);
             LOGGER.info("Node priority " + priority);
             LOGGER.info("Quorum override " + quorumOverride);
@@ -951,7 +964,7 @@ public class ReplicatedEnvironmentFacade
         envConfig.setAllowCreate(true);
         envConfig.setTransactional(true);
         envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
-        envConfig.setDurability(durability);
+        envConfig.setDurability(_defaultDurability);
 
         for (Map.Entry<String, String> configItem : 
environmentSettings.entrySet())
         {
@@ -1037,7 +1050,7 @@ public class ReplicatedEnvironmentFacade
     @Override
     public Committer createCommitter(String name)
     {
-        return new CoalescingCommiter(name, this);
+        return _coalescingCommiter;
     }
 
     NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, 
ServiceConnectFailedException
@@ -1075,24 +1088,37 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
-    public SyncPolicy getLocalTransactionSyncronizationPolicy()
+    public SyncPolicy getMessageStoreLocalTransactionSyncronizationPolicy()
     {
-        return _localTransactionSyncronizationPolicy;
+        return _messageStoreLocalTransactionSyncronizationPolicy;
     }
 
-    public SyncPolicy getRemoteTransactionSyncronizationPolicy()
+    public SyncPolicy getMessageStoreRemoteTransactionSyncronizationPolicy()
     {
-        return _remoteTransactionSyncronizationPolicy;
+        return _messageStoreRemoteTransactionSyncronizationPolicy;
     }
 
-    public void setLocalTransactionSyncronizationPolicy(SyncPolicy 
localTransactionSyncronizationPolicy)
+    public ReplicaAckPolicy getReplicaAcknowledgmentPolicy()
     {
-        _localTransactionSyncronizationPolicy = 
localTransactionSyncronizationPolicy;
+        return REPLICA_REPLICA_ACKNOWLEDGMENT_POLICY;
+    }
+
+    public void 
setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy 
localTransactionSyncronizationPolicy)
+    {
+        _messageStoreLocalTransactionSyncronizationPolicy = 
localTransactionSyncronizationPolicy;
+        if (localTransactionSyncronizationPolicy == 
LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY)
+        {
+            _coalescingCommiter.start();
+        }
+        else
+        {
+            _coalescingCommiter.stop();
+        }
     }
 
-    public void setRemoteTransactionSyncronizationPolicy(SyncPolicy 
remoteTransactionSyncronizationPolicy)
+    public void 
setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy 
remoteTransactionSyncronizationPolicy)
     {
-        _remoteTransactionSyncronizationPolicy = 
remoteTransactionSyncronizationPolicy;
+        _messageStoreRemoteTransactionSyncronizationPolicy = 
remoteTransactionSyncronizationPolicy;
     }
 
     private void populateExistingRemoteReplicationNodes()

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
 Tue Jun  3 08:25:16 2014
@@ -24,7 +24,7 @@ import java.util.Map;
 
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
-import 
org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 
 public class ReplicatedEnvironmentFacadeFactory implements 
EnvironmentFacadeFactory
 {
@@ -36,76 +36,64 @@ public class ReplicatedEnvironmentFacade
             @Override
             public boolean isDesignatedPrimary()
             {
-                return 
(Boolean)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.DESIGNATED_PRIMARY);
-            }
-
-            @Override
-            public boolean isCoalescingSync()
-            {
-                return 
(Boolean)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.COALESCING_SYNC);
+                return 
(Boolean)messageStoreSettings.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY);
             }
 
             @Override
             public String getStorePath()
             {
-                return (String) 
messageStoreSettings.get(BDBHAVirtualHostNodeImpl.STORE_PATH);
+                return (String) 
messageStoreSettings.get(BDBHAVirtualHostNode.STORE_PATH);
             }
 
             @SuppressWarnings("unchecked")
             @Override
             public Map<String, String> getParameters()
             {
-                return (Map<String, String>) 
messageStoreSettings.get(BDBHAVirtualHostNodeImpl.ENVIRONMENT_CONFIGURATION);
+                return (Map<String, String>) 
messageStoreSettings.get(BDBHAVirtualHostNode.ENVIRONMENT_CONFIGURATION);
             }
 
             @SuppressWarnings("unchecked")
             @Override
             public Map<String, String> getReplicationParameters()
             {
-                return (Map<String, String>) 
messageStoreSettings.get(BDBHAVirtualHostNodeImpl.REPLICATED_ENVIRONMENT_CONFIGURATION);
+                return (Map<String, String>) 
messageStoreSettings.get(BDBHAVirtualHostNode.REPLICATED_ENVIRONMENT_CONFIGURATION);
             }
 
             @Override
             public int getQuorumOverride()
             {
-                return 
(Integer)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.QUORUM_OVERRIDE);
+                return 
(Integer)messageStoreSettings.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE);
             }
 
             @Override
             public int getPriority()
             {
-                return 
(Integer)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.PRIORITY);
+                return 
(Integer)messageStoreSettings.get(BDBHAVirtualHostNode.PRIORITY);
             }
 
             @Override
             public String getName()
             {
-                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.NAME);
+                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNode.NAME);
             }
 
             @Override
             public String getHostPort()
             {
-                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.ADDRESS);
+                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNode.ADDRESS);
             }
 
             @Override
             public String getHelperHostPort()
             {
-                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.HELPER_ADDRESS);
+                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNode.HELPER_ADDRESS);
             }
 
             @Override
             public String getGroupName()
             {
-                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.GROUP_NAME);
+                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNode.GROUP_NAME);
             }
-
-            @Override
-            public String getDurability()
-            {
-                return 
(String)messageStoreSettings.get(BDBHAVirtualHostNodeImpl.DURABILITY);
-             }
         };
         return new ReplicatedEnvironmentFacade(configuration, 
initialisationTasks);
 

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
 Tue Jun  3 08:25:16 2014
@@ -21,18 +21,27 @@
 package org.apache.qpid.server.virtualhost.berkeleydb;
 
 import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.model.DerivedAttribute;
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends 
VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>
 {
-    String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = 
"remoteTransactionSyncronizationPolicy";
-    String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = 
"localTransactionSyncronizationPolicy";
+    String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = 
"remoteTransactionSynchronizationPolicy";
+    String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = 
"localTransactionSynchronizationPolicy";
+    String COALESCING_SYNC = "coalescingSync";
+    String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy";
 
-    @ManagedAttribute( defaultValue = "NO_SYNC")
-    String getLocalTransactionSyncronizationPolicy();
+    @ManagedAttribute( defaultValue = "SYNC")
+    String getLocalTransactionSynchronizationPolicy();
 
     @ManagedAttribute( defaultValue = "NO_SYNC")
-    String getRemoteTransactionSyncronizationPolicy();
+    String getRemoteTransactionSynchronizationPolicy();
+
+    @DerivedAttribute
+    String getReplicaAcknowledgmentPolicy();
+
+    @DerivedAttribute
+    boolean isCoalescingSync();
 }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
 Tue Jun  3 08:25:16 2014
@@ -45,11 +45,11 @@ public class BDBHAVirtualHostImpl extend
     private final BDBMessageStore _messageStore;
     private MessageStoreLogSubject _messageStoreLogSubject;
 
-    
@ManagedAttributeField(afterSet="setLocalTransactionSyncronizationPolicyOnEnvironment")
-    private String _localTransactionSyncronizationPolicy;
+    
@ManagedAttributeField(afterSet="setLocalTransactionSynchronizationPolicyOnEnvironment")
+    private String _localTransactionSynchronizationPolicy;
 
-    
@ManagedAttributeField(afterSet="setRemoteTransactionSyncronizationPolicyOnEnvironment")
-    private String _remoteTransactionSyncronizationPolicy;
+    
@ManagedAttributeField(afterSet="setRemoteTransactionSynchronizationPolicyOnEnvironment")
+    private String _remoteTransactionSynchronizationPolicy;
 
     @ManagedObjectFactoryConstructor
     public BDBHAVirtualHostImpl(final Map<String, Object> attributes, 
VirtualHostNode<?> virtualHostNode)
@@ -84,23 +84,46 @@ public class BDBHAVirtualHostImpl extend
     }
 
     @Override
-    public String getLocalTransactionSyncronizationPolicy()
+    public String getLocalTransactionSynchronizationPolicy()
     {
-        return _localTransactionSyncronizationPolicy;
+        return _localTransactionSynchronizationPolicy;
     }
 
     @Override
-    public String getRemoteTransactionSyncronizationPolicy()
+    public String getRemoteTransactionSynchronizationPolicy()
     {
-        return _remoteTransactionSyncronizationPolicy;
+        return _remoteTransactionSynchronizationPolicy;
+    }
+
+
+    @Override
+    public String getReplicaAcknowledgmentPolicy()
+    {
+        ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+        if (facade != null)
+        {
+            return facade.getReplicaAcknowledgmentPolicy().name();
+        }
+        return null;
+    }
+
+    @Override
+    public boolean isCoalescingSync()
+    {
+        ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
+        if (facade != null)
+        {
+            return facade.isCoalescingSync();
+        }
+        return false;
     }
 
     @Override
     public void onOpen()
     {
         super.onOpen();
-        setRemoteTransactionSyncronizationPolicyOnEnvironment();
-        setLocalTransactionSyncronizationPolicyOnEnvironment();
+        setRemoteTransactionSynchronizationPolicyOnEnvironment();
+        setLocalTransactionSynchronizationPolicyOnEnvironment();
     }
 
     @Override
@@ -110,13 +133,13 @@ public class BDBHAVirtualHostImpl extend
 
         if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY))
         {
-            String policy = 
((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSyncronizationPolicy();
+            String policy = 
((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSynchronizationPolicy();
             validateTransactionSynchronizationPolicy(policy);
         }
 
         
if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY))
         {
-            String policy = 
((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSyncronizationPolicy();
+            String policy = 
((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSynchronizationPolicy();
             validateTransactionSynchronizationPolicy(policy);
         }
     }
@@ -133,21 +156,21 @@ public class BDBHAVirtualHostImpl extend
         }
     }
 
-    protected void setLocalTransactionSyncronizationPolicyOnEnvironment()
+    protected void setLocalTransactionSynchronizationPolicyOnEnvironment()
     {
         ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
         if (facade != null)
         {
-            
facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSyncronizationPolicy()));
+            
facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.valueOf(getLocalTransactionSynchronizationPolicy()));
         }
     }
 
-    protected void setRemoteTransactionSyncronizationPolicyOnEnvironment()
+    protected void setRemoteTransactionSynchronizationPolicyOnEnvironment()
     {
         ReplicatedEnvironmentFacade facade = getReplicatedEnvironmentFacade();
         if (facade != null)
         {
-            
facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSyncronizationPolicy()));
+            
facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.valueOf(getRemoteTransactionSynchronizationPolicy()));
         }
     }
 
@@ -155,4 +178,5 @@ public class BDBHAVirtualHostImpl extend
     {
         return 
(ReplicatedEnvironmentFacade)_messageStore.getEnvironmentFacade();
     }
+
 }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNode.java
 Tue Jun  3 08:25:16 2014
@@ -31,7 +31,6 @@ public interface BDBHAVirtualHostNode<X 
     public static final String ADDRESS = "address";
     public static final String HELPER_ADDRESS = "helperAddress";
     public static final String DURABILITY = "durability";
-    public static final String COALESCING_SYNC = "coalescingSync";
     public static final String DESIGNATED_PRIMARY = "designatedPrimary";
     public static final String PRIORITY = "priority";
     public static final String QUORUM_OVERRIDE = "quorumOverride";

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
 Tue Jun  3 08:25:16 2014
@@ -177,7 +177,7 @@ public class BDBHAVirtualHostNodeImpl ex
     @Override
     public String getDurability()
     {
-        ReplicatedEnvironmentFacade environmentFacade = 
getReplicatedEnvironmentFacade();
+        ReplicatedEnvironmentFacade environmentFacade = 
_environmentFacade.get();
         if (environmentFacade != null)
         {
             return environmentFacade.getDurability().toString();

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
 Tue Jun  3 08:25:16 2014
@@ -174,7 +174,7 @@ public class BDBHAVirtualHostNodeTest ex
         assertEquals(nodeHostPort, replicationConfig.getNodeHostPort());
         assertEquals(helperHostPort, replicationConfig.getHelperHosts());
 
-        assertEquals("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", 
environment.getConfig().getDurability().toString());
+        assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", 
environment.getConfig().getDurability().toString());
         assertEquals("Unexpected JE replication stream timeout", 
repStreamTimeout, 
replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
 
         assertTrue("Virtual host child has not been added", 
virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
@@ -465,17 +465,21 @@ public class BDBHAVirtualHostNodeTest ex
         BDBHAVirtualHostImpl virtualHost = 
(BDBHAVirtualHostImpl)node.getVirtualHost();
         assertNotNull("Virtual host is not created", virtualHost);
 
-        assertEquals("Unexpected local transaction synchronization policy", 
"NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
-        assertEquals("Unexpected remote transaction synchronization policy", 
"NO_SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
+        awaitForAttributeChange(virtualHost, 
BDBHAVirtualHostImpl.COALESCING_SYNC, true);
+
+        assertEquals("Unexpected local transaction synchronization policy", 
"SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
+        assertEquals("Unexpected remote transaction synchronization policy", 
"NO_SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
+        assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync());
 
         Map<String, Object> virtualHostAttributes = new 
HashMap<String,Object>();
         
virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY,
 "WRITE_NO_SYNC");
         
virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY,
 "SYNC");
         virtualHost.setAttributes(virtualHostAttributes);
 
-        assertEquals("Unexpected local transaction synchronization policy", 
"WRITE_NO_SYNC", virtualHost.getLocalTransactionSyncronizationPolicy());
-        assertEquals("Unexpected remote transaction synchronization policy", 
"SYNC", virtualHost.getRemoteTransactionSyncronizationPolicy());
-
+        awaitForAttributeChange(virtualHost, 
BDBHAVirtualHostImpl.COALESCING_SYNC, false);
+        assertEquals("Unexpected local transaction synchronization policy", 
"WRITE_NO_SYNC", virtualHost.getLocalTransactionSynchronizationPolicy());
+        assertEquals("Unexpected remote transaction synchronization policy", 
"SYNC", virtualHost.getRemoteTransactionSynchronizationPolicy());
+        assertFalse("CoalescingSync is not OFF", 
virtualHost.isCoalescingSync());
         try
         {
             virtualHost.setAttributes(Collections.<String, 
Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, 
"INVALID"));

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 Tue Jun  3 08:25:16 2014
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.qpid.server.store.berkeleydb.Committer;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.test.utils.TestFileUtils;
@@ -62,9 +63,8 @@ public class ReplicatedEnvironmentFacade
     private static final String TEST_NODE_NAME = "testNodeName";
     private static final String TEST_NODE_HOST_PORT = "localhost:" + 
TEST_NODE_PORT;
     private static final String TEST_NODE_HELPER_HOST_PORT = 
TEST_NODE_HOST_PORT;
-    private static final String TEST_DURABILITY = 
Durability.parse("NO_SYNC,NO_SYNC,SIMPLE_MAJORITY").toString();
+    private static final Durability TEST_DURABILITY = 
Durability.parse("SYNC,NO_SYNC,SIMPLE_MAJORITY");
     private static final boolean TEST_DESIGNATED_PRIMARY = false;
-    private static final boolean TEST_COALESCING_SYNC = true;
     private static final int TEST_PRIORITY = 1;
     private static final int TEST_ELECTABLE_GROUP_OVERRIDE = 0;
 
@@ -185,12 +185,32 @@ public class ReplicatedEnvironmentFacade
 
     public void testGetDurability() throws Exception
     {
-        assertEquals("Unexpected durability", TEST_DURABILITY.toString(), 
createMaster().getDurability().toString());
+        ReplicatedEnvironmentFacade master = createMaster();
+        assertEquals("Unexpected message store durability", TEST_DURABILITY, 
master.getMessageStoreTransactionDurability());
+        assertEquals("Unexpected durability", TEST_DURABILITY, 
master.getDurability());
+        Committer committer = master.createCommitter(TEST_GROUP_NAME);
+        committer.start();
+
+        waitForCommitter(committer, true);
+
+        assertEquals("Unexpected message store durability after committer 
start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", 
master.getMessageStoreTransactionDurability().toString());
+
+        committer.stop();
+        waitForCommitter(committer, false);
+        assertEquals("Unexpected message store durability after committer 
stop", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
     }
 
     public void testIsCoalescingSync() throws Exception
     {
-        assertEquals("Unexpected coalescing sync", TEST_COALESCING_SYNC, 
createMaster().isCoalescingSync());
+        ReplicatedEnvironmentFacade master = createMaster();
+        assertEquals("Unexpected coalescing sync", false, 
master.isCoalescingSync());
+        Committer committer = master.createCommitter(TEST_GROUP_NAME);
+        committer.start();
+        waitForCommitter(committer, true);
+        assertEquals("Unexpected coalescing sync", true, 
master.isCoalescingSync());
+        committer.stop();
+        waitForCommitter(committer, false);
+        assertEquals("Unexpected coalescing sync", false, 
master.isCoalescingSync());
     }
 
     public void testGetNodeState() throws Exception
@@ -690,20 +710,20 @@ public class ReplicatedEnvironmentFacade
     {
         ReplicatedEnvironmentFacade facade = createMaster();
         assertEquals("Unexpected local transaction synchronization policy 
before change",
-                
ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, 
facade.getLocalTransactionSyncronizationPolicy());
-        
facade.setLocalTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+                
ReplicatedEnvironmentFacade.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, 
facade.getMessageStoreLocalTransactionSyncronizationPolicy());
+        
facade.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
         assertEquals("Unexpected local transaction synchronization policy 
after change",
-                SyncPolicy.WRITE_NO_SYNC, 
facade.getLocalTransactionSyncronizationPolicy());
+                SyncPolicy.WRITE_NO_SYNC, 
facade.getMessageStoreLocalTransactionSyncronizationPolicy());
     }
 
     public void testSetRemoteTransactionSyncronizationPolicy() throws Exception
     {
         ReplicatedEnvironmentFacade facade = createMaster();
         assertEquals("Unexpected remote transaction synchronization policy 
before change",
-                
ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, 
facade.getRemoteTransactionSyncronizationPolicy());
-        
facade.setRemoteTransactionSyncronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+                
ReplicatedEnvironmentFacade.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, 
facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
+        
facade.setMessageStoreRemoteTransactionSyncrhonizationPolicy(SyncPolicy.WRITE_NO_SYNC);
         assertEquals("Unexpected remote transaction synchronization policy 
after change",
-                SyncPolicy.WRITE_NO_SYNC, 
facade.getRemoteTransactionSyncronizationPolicy());
+                SyncPolicy.WRITE_NO_SYNC, 
facade.getMessageStoreRemoteTransactionSyncronizationPolicy());
     }
 
     public void testBeginTransaction() throws Exception
@@ -780,6 +800,17 @@ public class ReplicatedEnvironmentFacade
         return dbConfig;
     }
 
+    private void waitForCommitter(Committer committer, boolean expected) 
throws InterruptedException
+    {
+        int counter = 0;
+        while(committer.isStarted() != expected && counter < 100)
+        {
+            Thread.sleep(20);
+            counter++;
+        }
+        assertEquals("Committer is not in expected state", expected, 
committer.isStarted());
+    }
+
     private ReplicatedEnvironmentConfiguration 
createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, 
boolean designatedPrimary)
     {
         ReplicatedEnvironmentConfiguration node = 
mock(ReplicatedEnvironmentConfiguration.class);
@@ -790,8 +821,6 @@ public class ReplicatedEnvironmentFacade
         when(node.getPriority()).thenReturn(TEST_PRIORITY);
         when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
         when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
-        when(node.getDurability()).thenReturn(TEST_DURABILITY);
-        when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC);
 
         Map<String, String> repConfig = new HashMap<String, String>();
         repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");

Modified: 
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java?rev=1599445&r1=1599444&r2=1599445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
 Tue Jun  3 08:25:16 2014
@@ -95,23 +95,25 @@ public class BDBHAVirtualHostRestTest ex
 
     public void testSetLocalTransactionSynchronizationPolicy() throws Exception
     {
-        
assertSetTransactionSynchronizationPolicy(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY);
-    }
+        Map<String, Object> hostAttributes = 
waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, 
State.ACTIVE.name());
+        assertEquals("Unexpected synchronization policy before change", 
"SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY));
 
-    public void testSetRemoteTransactionSynchronizationPolicy() throws 
Exception
-    {
-        
assertSetTransactionSynchronizationPolicy(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY);
+        Map<String, Object> newPolicy = Collections.<String, 
Object>singletonMap(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "NO_SYNC");
+        getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 
200);
+
+        hostAttributes = 
getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
+        assertEquals("Unexpected synchronization policy after change", 
"NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY));
     }
 
-    private void assertSetTransactionSynchronizationPolicy(String 
policyAttribute) throws Exception, IOException
+    public void testSetRemoteTransactionSynchronizationPolicy() throws 
Exception
     {
         Map<String, Object> hostAttributes = 
waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, 
State.ACTIVE.name());
-        assertEquals("Unexpected synchronization policy before change", 
"NO_SYNC", hostAttributes.get(policyAttribute));
+        assertEquals("Unexpected synchronization policy before change", 
"NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY));
 
-        Map<String, Object> newPolicy = Collections.<String, 
Object>singletonMap(policyAttribute, "SYNC");
+        Map<String, Object> newPolicy = Collections.<String, 
Object>singletonMap(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
         getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 
200);
 
         hostAttributes = 
getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
-        assertEquals("Unexpected synchronization policy after change", "SYNC", 
hostAttributes.get(policyAttribute));
+        assertEquals("Unexpected synchronization policy after change", "SYNC", 
hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY));
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to