Author: orudyy
Date: Mon Jun  9 17:13:56 2014
New Revision: 1601445

URL: http://svn.apache.org/r1601445
Log:
QPID-5715: Move coalescing commiters into environment facades. Flush logs for 
not completed commit futures if commit task is stopped

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
    
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
 Mon Jun  9 17:13:56 2014
@@ -127,8 +127,6 @@ public class BDBConfigurationStore imple
 
     private final EnvironmentFacadeFactory _environmentFacadeFactory;
 
-    private volatile Committer _committer;
-
     private String _storeLocation;
     private final BDBMessageStore _messageStoreFacade = new BDBMessageStore();
     private ConfiguredObject<?> _parent;
@@ -641,9 +639,6 @@ public class BDBConfigurationStore imple
                                                                                
           );
                     _storeLocation = _environmentFacade.getStoreLocation();
                 }
-
-                _committer = 
_environmentFacade.createCommitter(parent.getName());
-                _committer.start();
             }
         }
 
@@ -699,22 +694,9 @@ public class BDBConfigurationStore imple
         @Override
         public void closeMessageStore()
         {
-            if (_messageStoreOpen.compareAndSet(true, false))
+            if (_messageStoreOpen.compareAndSet(true, false) && 
!_configurationStoreOpen.get())
             {
-                try
-                {
-                    if (_committer != null)
-                    {
-                        _committer.close();
-                    }
-                }
-                finally
-                {
-                    if (!_configurationStoreOpen.get())
-                    {
-                        closeEnvironment();
-                    }
-                }
+                closeEnvironment();
             }
         }
 
@@ -908,8 +890,7 @@ public class BDBConfigurationStore imple
                             LOGGER.debug("Deleted content for message " + 
messageId);
                         }
 
-                        _environmentFacade.commit(tx);
-                        _committer.commit(tx, sync);
+                        _environmentFacade.commit(tx, sync);
 
                         complete = true;
                         tx = null;
@@ -1334,8 +1315,7 @@ public class BDBConfigurationStore imple
                 throw new StoreException("Fatal internal error: transactional 
is null at commitTran");
             }
 
-            _environmentFacade.commit(tx);
-            StoreFuture result =  _committer.commit(tx, syncCommit);
+            StoreFuture result = _environmentFacade.commit(tx, syncCommit);
 
             if (LOGGER.isDebugEnabled())
             {
@@ -1617,8 +1597,7 @@ public class BDBConfigurationStore imple
                         throw 
_environmentFacade.handleDatabaseException("failed to begin transaction", e);
                     }
                     store(txn);
-                    _environmentFacade.commit(txn);
-                    _committer.commit(txn, true);
+                    _environmentFacade.commit(txn, true);
 
                     storedSizeChangeOccurred(getMetaData().getContentSize());
                 }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
 Mon Jun  9 17:13:56 2014
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.store.StoreFuture;
@@ -38,12 +37,10 @@ import com.sleepycat.je.Transaction;
 public class CoalescingCommiter implements Committer
 {
     private final CommitTask _commitTask;
-    private final AtomicReference<Boolean> _started;
     private final ExecutorService _taskExecutor;
 
     public CoalescingCommiter(final String name, EnvironmentFacade 
environmentFacade)
     {
-        _started = new AtomicReference<Boolean>(false);
         _commitTask = new CommitTask(environmentFacade);
         _taskExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
         {
@@ -60,7 +57,7 @@ public class CoalescingCommiter implemen
     @Override
     public void start()
     {
-        if (_started.compareAndSet(false, true))
+        if (_commitTask.start())
         {
             _taskExecutor.submit(_commitTask);
         }
@@ -69,10 +66,7 @@ public class CoalescingCommiter implemen
     @Override
     public void stop()
     {
-        if (_started.compareAndSet(true, false))
-        {
-            _commitTask.stop();
-        }
+        _commitTask.stop();
     }
 
     @Override
@@ -80,7 +74,6 @@ public class CoalescingCommiter implemen
     {
         try
         {
-            _started.set(false);
             _commitTask.close();
         }
         finally
@@ -92,7 +85,7 @@ public class CoalescingCommiter implemen
     @Override
     public StoreFuture commit(Transaction tx, boolean syncCommit)
     {
-        if (_started.get())
+        if (isStarted())
         {
             BDBCommitFuture commitFuture = new BDBCommitFuture(_commitTask, 
tx, syncCommit);
             try
@@ -111,7 +104,7 @@ public class CoalescingCommiter implemen
 
     public boolean isStarted()
     {
-        return _started.get();
+        return !_commitTask.isStopped();
     }
 
     private static final class BDBCommitFuture implements StoreFuture
@@ -187,11 +180,6 @@ public class CoalescingCommiter implemen
 
             while (!isComplete())
             {
-                if (_commitTask.isStopped())
-                {
-                    throw new IllegalStateException("Commit thread is 
stopped");
-                }
-
                 _commitTask.explicitNotify();
                 try
                 {
@@ -201,6 +189,24 @@ public class CoalescingCommiter implemen
                 {
                     throw new RuntimeException(e);
                 }
+
+                if (!_commitTask.isClosed() && _commitTask.isStopped() && 
!isComplete())
+                {
+                    // coalesing sync is not required anymore
+                    // flush log and mark transaction as completed
+                    try
+                    {
+                        _commitTask.flushLog();
+                    }
+                    catch(DatabaseException e)
+                    {
+                        _databaseException = e;
+                    }
+                    finally
+                    {
+                        complete();
+                    }
+                }
             }
 
             if(LOGGER.isDebugEnabled())
@@ -224,6 +230,7 @@ public class CoalescingCommiter implemen
         private static final Logger LOGGER = 
Logger.getLogger(CommitTask.class);
 
         private final AtomicBoolean _stopped = new AtomicBoolean(true);
+        private final AtomicBoolean _closed = new AtomicBoolean(false);
         private final Queue<BDBCommitFuture> _jobQueue = new 
ConcurrentLinkedQueue<BDBCommitFuture>();
         private final Object _lock = new Object();
         private final EnvironmentFacade _environmentFacade;
@@ -233,6 +240,11 @@ public class CoalescingCommiter implemen
             _environmentFacade = environmentFacade;
         }
 
+        public boolean isClosed()
+        {
+            return _closed.get();
+        }
+
         public boolean isStopped()
         {
             return _stopped.get();
@@ -249,27 +261,30 @@ public class CoalescingCommiter implemen
         @Override
         public void run()
         {
-            if (_stopped.compareAndSet(true, false))
+            while (!_stopped.get())
             {
-                while (!_stopped.get())
+                synchronized (_lock)
                 {
-                    synchronized (_lock)
+                    while (!_stopped.get() && !hasJobs())
                     {
-                        while (!_stopped.get() && !hasJobs())
+                        try
+                        {
+                            // Periodically wake up and check, just in case we
+                            // missed a notification. Don't want to lock the 
broker hard.
+                            _lock.wait(1000);
+                        }
+                        catch (InterruptedException e)
                         {
-                            try
-                            {
-                                // Periodically wake up and check, just in 
case we
-                                // missed a notification. Don't want to lock 
the broker hard.
-                                _lock.wait(1000);
-                            }
-                            catch (InterruptedException e)
-                            {
-                            }
                         }
                     }
-                    processJobs();
                 }
+                processJobs();
+            }
+
+            // process remaining jobs if such were added whilst stopped
+            if (hasJobs())
+            {
+                processJobs();
             }
         }
 
@@ -285,11 +300,7 @@ public class CoalescingCommiter implemen
                     startTime = System.currentTimeMillis();
                 }
 
-                Environment environment = _environmentFacade.getEnvironment();
-                if (environment != null && environment.isValid())
-                {
-                    environment.flushLog(true);
-                }
+                flushLog();
 
                 if(LOGGER.isDebugEnabled())
                 {
@@ -340,6 +351,15 @@ public class CoalescingCommiter implemen
             }
         }
 
+        private void flushLog()
+        {
+            Environment environment = _environmentFacade.getEnvironment();
+            if (environment != null && environment.isValid())
+            {
+                environment.flushLog(true);
+            }
+        }
+
         private boolean hasJobs()
         {
             return !_jobQueue.isEmpty();
@@ -361,35 +381,44 @@ public class CoalescingCommiter implemen
             }
         }
 
+        public boolean start()
+        {
+            return _stopped.compareAndSet(true, false);
+        }
+
         public void stop()
         {
-            synchronized (_lock)
+            if (_stopped.compareAndSet(false, true))
             {
-                if (_stopped.compareAndSet(false, true) || hasJobs())
+                synchronized (_lock)
                 {
-                    processJobs();
+                    _lock.notifyAll();
                 }
+                _jobQueue.clear();
             }
         }
 
         public void close()
         {
-            RuntimeException e = new RuntimeException("Commit thread has been 
stopped");
-            synchronized (_lock)
+            if (_closed.compareAndSet(false, true))
             {
-                _stopped.set(true);
-                BDBCommitFuture commit = null;
-                int abortedCommits = 0;
-                while ((commit = _jobQueue.poll()) != null)
-                {
-                    abortedCommits++;
-                    commit.abort(e);
-                }
-                if (LOGGER.isDebugEnabled() && abortedCommits > 0)
+                RuntimeException e = new RuntimeException("Commit thread has 
been closed");
+                synchronized (_lock)
                 {
-                    LOGGER.debug(abortedCommits + " commit(s) were aborted 
during close.");
+                    _stopped.set(true);
+                    BDBCommitFuture commit = null;
+                    int abortedCommits = 0;
+                    while ((commit = _jobQueue.poll()) != null)
+                    {
+                        abortedCommits++;
+                        commit.abort(e);
+                    }
+                    if (LOGGER.isDebugEnabled() && abortedCommits > 0)
+                    {
+                        LOGGER.debug(abortedCommits + " commit(s) were aborted 
during close.");
+                    }
+                    _lock.notifyAll();
                 }
-                _lock.notifyAll();
             }
         }
     }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
 Mon Jun  9 17:13:56 2014
@@ -35,36 +35,4 @@ public interface Committer
     void close();
 
     boolean isStarted();
-
-    Committer IMMEDIATE_FUTURE_COMMITTER = new Committer()
-    {
-
-        @Override
-        public void start()
-        {
-        }
-
-        @Override
-        public StoreFuture commit(Transaction tx, boolean syncCommit)
-        {
-            return StoreFuture.IMMEDIATE_FUTURE;
-        }
-
-        @Override
-        public void stop()
-        {
-        }
-
-        @Override
-        public void close()
-        {
-        }
-
-        @Override
-        public boolean isStarted()
-        {
-            return true;
-        }
-    };
-
 }
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
 Mon Jun  9 17:13:56 2014
@@ -24,6 +24,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.qpid.server.store.StoreFuture;
+
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
@@ -50,11 +52,9 @@ public interface EnvironmentFacade
 
     Sequence openSequence(Database database, DatabaseEntry sequenceKey, 
SequenceConfig sequenceConfig);
 
-    Committer createCommitter(String name);
-
     Transaction beginTransaction();
 
-    void commit(com.sleepycat.je.Transaction tx);
+    StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync);
 
     DatabaseException handleDatabaseException(String contextMessage, 
DatabaseException e);
 

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
 Mon Jun  9 17:13:56 2014
@@ -21,15 +21,16 @@
 package org.apache.qpid.server.store.berkeleydb;
 
 import java.io.File;
-import java.net.ServerSocket;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.Sequence;
 import com.sleepycat.je.SequenceConfig;
-import com.sun.org.apache.xerces.internal.dom.DeepNodeListImpl;
+
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.StoreFuture;
 
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
@@ -48,6 +49,7 @@ public class StandardEnvironmentFacade i
     private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences 
= new ConcurrentHashMap<>();
 
     private Environment _environment;
+    private final Committer _committer;
 
     public StandardEnvironmentFacade(String storePath,
                                      Map<String, String> attributes)
@@ -69,6 +71,7 @@ public class StandardEnvironmentFacade i
             }
         }
 
+        String name = (String)attributes.get(ConfiguredObject.NAME);
         EnvironmentConfig envConfig = new EnvironmentConfig();
         envConfig.setAllowCreate(true);
         envConfig.setTransactional(true);
@@ -82,6 +85,9 @@ public class StandardEnvironmentFacade i
         envConfig.setExceptionListener(new LoggingAsyncExceptionListener());
 
         _environment = new Environment(environmentPath, envConfig);
+
+        _committer =  new CoalescingCommiter(name, this);
+        _committer.start();
     }
 
 
@@ -92,7 +98,7 @@ public class StandardEnvironmentFacade i
     }
 
     @Override
-    public void commit(com.sleepycat.je.Transaction tx)
+    public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean 
syncCommit)
     {
         try
         {
@@ -106,14 +112,26 @@ public class StandardEnvironmentFacade i
 
             throw handleDatabaseException("Got DatabaseException on commit", 
de);
         }
+        return _committer.commit(tx, syncCommit);
     }
 
     @Override
     public void close()
     {
-        closeSequences();
-        closeDatabases();
-        closeEnvironment();
+        try
+        {
+            if (_committer != null)
+            {
+                _committer.close();
+            }
+
+            closeSequences();
+            closeDatabases();
+        }
+        finally
+        {
+            closeEnvironment();
+        }
     }
 
     private void closeSequences()
@@ -296,12 +314,6 @@ public class StandardEnvironmentFacade i
     }
 
     @Override
-    public Committer createCommitter(String name)
-    {
-        return new CoalescingCommiter(name, this);
-    }
-
-    @Override
     public String getStoreLocation()
     {
         return _storePath;

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 Mon Jun  9 17:13:56 2014
@@ -45,9 +45,10 @@ import java.util.concurrent.atomic.Atomi
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.Sequence;
 import com.sleepycat.je.SequenceConfig;
+
 import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
-import org.apache.qpid.server.store.berkeleydb.Committer;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
 import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
 import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -213,7 +214,7 @@ public class ReplicatedEnvironmentFacade
     }
 
     @Override
-    public void commit(final Transaction tx)
+    public StoreFuture commit(final Transaction tx, boolean syncCommit)
     {
         try
         {
@@ -225,6 +226,7 @@ public class ReplicatedEnvironmentFacade
         {
             throw handleDatabaseException("Got DatabaseException on commit, 
closing environment", de);
         }
+        return _coalescingCommiter.commit(tx, syncCommit);
     }
 
     @Override
@@ -246,6 +248,7 @@ public class ReplicatedEnvironmentFacade
 
                 try
                 {
+                    _coalescingCommiter.close();
                     closeSequences();
                     closeDatabases();
                 }
@@ -1036,12 +1039,6 @@ public class ReplicatedEnvironmentFacade
         return environment;
     }
 
-    @Override
-    public Committer createCommitter(String name)
-    {
-        return _coalescingCommiter;
-    }
-
     NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, 
ServiceConnectFailedException
     {
         if (repNode == null)

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHost.java
 Mon Jun  9 17:13:56 2014
@@ -28,8 +28,8 @@ import org.apache.qpid.server.virtualhos
 
 public interface BDBHAVirtualHost<X extends BDBHAVirtualHost<X>> extends 
VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>
 {
-    String REMOTE_TRANSACTION_SYNCRONIZATION_POLICY = 
"remoteTransactionSynchronizationPolicy";
-    String LOCAL_TRANSACTION_SYNCRONIZATION_POLICY = 
"localTransactionSynchronizationPolicy";
+    String REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY = 
"remoteTransactionSynchronizationPolicy";
+    String LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY = 
"localTransactionSynchronizationPolicy";
     String COALESCING_SYNC = "coalescingSync";
     String REPLICA_ACKNOWLEDGMENT_POLICY = "replicaAcknowledgmentPolicy";
 

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAVirtualHostImpl.java
 Mon Jun  9 17:13:56 2014
@@ -131,13 +131,13 @@ public class BDBHAVirtualHostImpl extend
     {
         super.validateChange(proxyForValidation, changedAttributes);
 
-        if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY))
+        
if(changedAttributes.contains(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY))
         {
             String policy = 
((BDBHAVirtualHost<?>)proxyForValidation).getLocalTransactionSynchronizationPolicy();
             validateTransactionSynchronizationPolicy(policy);
         }
 
-        
if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY))
+        
if(changedAttributes.contains(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY))
         {
             String policy = 
((BDBHAVirtualHost<?>)proxyForValidation).getRemoteTransactionSynchronizationPolicy();
             validateTransactionSynchronizationPolicy(policy);

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
 Mon Jun  9 17:13:56 2014
@@ -472,8 +472,8 @@ public class BDBHAVirtualHostNodeTest ex
         assertTrue("CoalescingSync is not ON", virtualHost.isCoalescingSync());
 
         Map<String, Object> virtualHostAttributes = new 
HashMap<String,Object>();
-        
virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY,
 "WRITE_NO_SYNC");
-        
virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY,
 "SYNC");
+        
virtualHostAttributes.put(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY,
 "WRITE_NO_SYNC");
+        
virtualHostAttributes.put(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY,
 "SYNC");
         virtualHost.setAttributes(virtualHostAttributes);
 
         awaitForAttributeChange(virtualHost, 
BDBHAVirtualHostImpl.COALESCING_SYNC, false);
@@ -482,7 +482,7 @@ public class BDBHAVirtualHostNodeTest ex
         assertFalse("CoalescingSync is not OFF", 
virtualHost.isCoalescingSync());
         try
         {
-            virtualHost.setAttributes(Collections.<String, 
Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, 
"INVALID"));
+            virtualHost.setAttributes(Collections.<String, 
Object>singletonMap(BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, 
"INVALID"));
             fail("Invalid syncronization policy is set");
         }
         catch(IllegalArgumentException e)
@@ -492,7 +492,7 @@ public class BDBHAVirtualHostNodeTest ex
 
         try
         {
-            virtualHost.setAttributes(Collections.<String, 
Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, 
"INVALID"));
+            virtualHost.setAttributes(Collections.<String, 
Object>singletonMap(BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, 
"INVALID"));
             fail("Invalid syncronization policy is set");
         }
         catch(IllegalArgumentException e)

Modified: 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
 Mon Jun  9 17:13:56 2014
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.qpid.server.store.berkeleydb.Committer;
 import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.test.utils.TestFileUtils;
@@ -169,29 +168,13 @@ public class ReplicatedEnvironmentFacade
         ReplicatedEnvironmentFacade master = createMaster();
         assertEquals("Unexpected message store durability", TEST_DURABILITY, 
master.getMessageStoreTransactionDurability());
         assertEquals("Unexpected durability", TEST_DURABILITY, 
master.getDurability());
-        Committer committer = master.createCommitter(TEST_GROUP_NAME);
-        committer.start();
-
-        waitForCommitter(committer, true);
-
+        assertFalse("Coalescing syn before policy set to SYNC", 
master.isCoalescingSync());
+        
master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.SYNC);
+        assertTrue("Coalescing syn after policy set to SYNC", 
master.isCoalescingSync());
         assertEquals("Unexpected message store durability after committer 
start", "NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", 
master.getMessageStoreTransactionDurability().toString());
-
-        committer.stop();
-        waitForCommitter(committer, false);
-        assertEquals("Unexpected message store durability after committer 
stop", TEST_DURABILITY, master.getMessageStoreTransactionDurability());
-    }
-
-    public void testIsCoalescingSync() throws Exception
-    {
-        ReplicatedEnvironmentFacade master = createMaster();
-        assertEquals("Unexpected coalescing sync", false, 
master.isCoalescingSync());
-        Committer committer = master.createCommitter(TEST_GROUP_NAME);
-        committer.start();
-        waitForCommitter(committer, true);
-        assertEquals("Unexpected coalescing sync", true, 
master.isCoalescingSync());
-        committer.stop();
-        waitForCommitter(committer, false);
-        assertEquals("Unexpected coalescing sync", false, 
master.isCoalescingSync());
+        
master.setMessageStoreLocalTransactionSynchronizationPolicy(SyncPolicy.WRITE_NO_SYNC);
+        assertEquals("Unexpected message store durability after committer 
stop", "WRITE_NO_SYNC,NO_SYNC,SIMPLE_MAJORITY", 
master.getMessageStoreTransactionDurability().toString());
+        assertFalse("Coalescing syn after policy set to WRITE_NO_SYNC", 
master.isCoalescingSync());
     }
 
     public void testGetNodeState() throws Exception
@@ -719,17 +702,6 @@ public class ReplicatedEnvironmentFacade
         return addNode(TEST_NODE_NAME, TEST_NODE_HOST_PORT, 
TEST_DESIGNATED_PRIMARY, desiredState, stateChangeListener, 
replicationGroupListener);
     }
 
-    private void waitForCommitter(Committer committer, boolean expected) 
throws InterruptedException
-    {
-        int counter = 0;
-        while(committer.isStarted() != expected && counter < 100)
-        {
-            Thread.sleep(20);
-            counter++;
-        }
-        assertEquals("Committer is not in expected state", expected, 
committer.isStarted());
-    }
-
     private ReplicatedEnvironmentConfiguration 
createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, 
boolean designatedPrimary)
     {
         ReplicatedEnvironmentConfiguration node = 
mock(ReplicatedEnvironmentConfiguration.class);

Modified: 
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java?rev=1601445&r1=1601444&r2=1601445&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostRestTest.java
 Mon Jun  9 17:13:56 2014
@@ -20,8 +20,8 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import static 
org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCRONIZATION_POLICY;
-import static 
org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCRONIZATION_POLICY;
+import static 
org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY;
+import static 
org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost.REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
 
 import java.io.File;
 import java.io.IOException;
@@ -96,24 +96,24 @@ public class BDBHAVirtualHostRestTest ex
     public void testSetLocalTransactionSynchronizationPolicy() throws Exception
     {
         Map<String, Object> hostAttributes = 
waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, 
State.ACTIVE.name());
-        assertEquals("Unexpected synchronization policy before change", 
"SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY));
+        assertEquals("Unexpected synchronization policy before change", 
"SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
 
-        Map<String, Object> newPolicy = Collections.<String, 
Object>singletonMap(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY, "NO_SYNC");
+        Map<String, Object> newPolicy = Collections.<String, 
Object>singletonMap(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY, "NO_SYNC");
         getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 
200);
 
         hostAttributes = 
getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
-        assertEquals("Unexpected synchronization policy after change", 
"NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCRONIZATION_POLICY));
+        assertEquals("Unexpected synchronization policy after change", 
"NO_SYNC", hostAttributes.get(LOCAL_TRANSACTION_SYNCHRONIZATION_POLICY));
     }
 
     public void testSetRemoteTransactionSynchronizationPolicy() throws 
Exception
     {
         Map<String, Object> hostAttributes = 
waitForAttributeChanged(_virtualhostUrl, VirtualHost.STATE, 
State.ACTIVE.name());
-        assertEquals("Unexpected synchronization policy before change", 
"NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY));
+        assertEquals("Unexpected synchronization policy before change", 
"NO_SYNC", hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
 
-        Map<String, Object> newPolicy = Collections.<String, 
Object>singletonMap(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY, "SYNC");
+        Map<String, Object> newPolicy = Collections.<String, 
Object>singletonMap(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY, "SYNC");
         getRestTestHelper().submitRequest(_virtualhostUrl, "PUT", newPolicy, 
200);
 
         hostAttributes = 
getRestTestHelper().getJsonAsSingletonList(_virtualhostUrl);
-        assertEquals("Unexpected synchronization policy after change", "SYNC", 
hostAttributes.get(REMOTE_TRANSACTION_SYNCRONIZATION_POLICY));
+        assertEquals("Unexpected synchronization policy after change", "SYNC", 
hostAttributes.get(REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY));
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to