Author: kwall
Date: Mon Jun  9 11:55:32 2014
New Revision: 1601351

URL: http://svn.apache.org/r1601351
Log:
QPID-5801: [Java Broker] BDB: Cache the sequence handler used for message 
sequence number

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java?rev=1601351&r1=1601350&r2=1601351&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
 Mon Jun  9 11:55:32 2014
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -130,8 +129,6 @@ public class BDBConfigurationStore imple
 
     private volatile Committer _committer;
 
-    private boolean _isMessageStoreProvider;
-
     private String _storeLocation;
     private final BDBMessageStore _messageStoreFacade = new BDBMessageStore();
     private ConfiguredObject<?> _parent;
@@ -241,8 +238,8 @@ public class BDBConfigurationStore imple
 
         for (ConfiguredObjectRecord record : configuredObjects.values())
         {
-            boolean shoudlContinue = handler.handle(record);
-            if (!shoudlContinue)
+            boolean shouldContinue = handler.handle(record);
+            if (!shouldContinue)
             {
                 break;
             }
@@ -670,27 +667,18 @@ public class BDBConfigurationStore imple
         public <T extends StorableMessageMetaData> StoredMessage<T> 
addMessage(T metaData)
         {
 
-            Sequence mmdSeq = null;
-            try
-            {
-                mmdSeq = getMessageMetaDataSeqDb().openSequence(null, 
MESSAGE_METADATA_SEQ_KEY, MESSAGE_METADATA_SEQ_CONFIG);
-                long newMessageId = mmdSeq.get(null, 1);
+            Sequence mmdSeq = 
_environmentFacade.openSequence(getMessageMetaDataSeqDb(),
+                                                              
MESSAGE_METADATA_SEQ_KEY,
+                                                              
MESSAGE_METADATA_SEQ_CONFIG);
+            long newMessageId = mmdSeq.get(null, 1);
 
-                if (metaData.isPersistent())
-                {
-                    return (StoredMessage<T>) new 
StoredBDBMessage(newMessageId, metaData);
-                }
-                else
-                {
-                    return new StoredMemoryMessage<T>(newMessageId, metaData);
-                }
+            if (metaData.isPersistent())
+            {
+                return (StoredMessage<T>) new StoredBDBMessage(newMessageId, 
metaData);
             }
-            finally
+            else
             {
-                if (mmdSeq != null)
-                {
-                    mmdSeq.close();
-                }
+                return new StoredMemoryMessage<T>(newMessageId, metaData);
             }
         }
 

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1601351&r1=1601350&r2=1601351&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
 Mon Jun  9 11:55:32 2014
@@ -26,9 +26,12 @@ import java.util.Map;
 
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Environment;
 import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
 import com.sleepycat.je.Transaction;
 
 public interface EnvironmentFacade
@@ -43,7 +46,9 @@ public interface EnvironmentFacade
 
     Environment getEnvironment();
 
-    Database openDatabase(String name, DatabaseConfig databaseConfig);
+    Database openDatabase(String databaseName, DatabaseConfig databaseConfig);
+
+    Sequence openSequence(Database database, DatabaseEntry sequenceKey, 
SequenceConfig sequenceConfig);
 
     Committer createCommitter(String name);
 
@@ -57,4 +62,5 @@ public interface EnvironmentFacade
 
     void closeDatabase(String name);
     void close();
+
 }

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1601351&r1=1601350&r2=1601351&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
 Mon Jun  9 11:55:32 2014
@@ -21,9 +21,14 @@
 package org.apache.qpid.server.store.berkeleydb;
 
 import java.io.File;
+import java.net.ServerSocket;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
+import com.sun.org.apache.xerces.internal.dom.DeepNodeListImpl;
 import org.apache.log4j.Logger;
 
 import com.sleepycat.je.Database;
@@ -40,6 +45,7 @@ public class StandardEnvironmentFacade i
 
     private final String _storePath;
     private final ConcurrentHashMap<String, Database> _cachedDatabases = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences 
= new ConcurrentHashMap<>();
 
     private Environment _environment;
 
@@ -105,20 +111,44 @@ public class StandardEnvironmentFacade i
     @Override
     public void close()
     {
+        closeSequences();
         closeDatabases();
         closeEnvironment();
     }
 
+    private void closeSequences()
+    {
+        RuntimeException firstThrownException = null;
+        for (DatabaseEntry  sequenceKey : _cachedSequences.keySet())
+        {
+            try
+            {
+                closeSequence(sequenceKey);
+            }
+            catch(DatabaseException de)
+            {
+                if (firstThrownException == null)
+                {
+                    firstThrownException = de;
+                }
+            }
+        }
+        if (firstThrownException != null)
+        {
+            throw firstThrownException;
+        }
+    }
+
     private void closeDatabases()
     {
         RuntimeException firstThrownException = null;
-        for (Database database : _cachedDatabases.values())
+        for (String databaseName : _cachedDatabases.keySet())
         {
             try
             {
-                database.close();
+                closeDatabase(databaseName);
             }
-            catch(RuntimeException e)
+            catch(DatabaseException e)
             {
                 if (firstThrownException == null)
                 {
@@ -223,6 +253,39 @@ public class StandardEnvironmentFacade i
     }
 
     @Override
+    public Sequence openSequence(final Database database,
+                                 final DatabaseEntry sequenceKey,
+                                 final SequenceConfig sequenceConfig)
+    {
+        Sequence cachedSequence = _cachedSequences.get(sequenceKey);
+        if (cachedSequence == null)
+        {
+            Sequence handle = database.openSequence(null, sequenceKey, 
sequenceConfig);
+            Sequence existingHandle = 
_cachedSequences.putIfAbsent(sequenceKey, handle);
+            if (existingHandle == null)
+            {
+                cachedSequence = handle;
+            }
+            else
+            {
+                cachedSequence = existingHandle;
+                handle.close();
+            }
+        }
+        return cachedSequence;
+    }
+
+
+    private void closeSequence(final DatabaseEntry sequenceKey)
+    {
+        Sequence cachedHandle = _cachedSequences.remove(sequenceKey);
+        if (cachedHandle != null)
+        {
+            cachedHandle.close();
+        }
+    }
+
+    @Override
     public void closeDatabase(final String name)
     {
         Database cachedHandle = _cachedDatabases.remove(name);

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1601351&r1=1601350&r2=1601351&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
 Mon Jun  9 11:55:32 2014
@@ -42,6 +42,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
 import org.apache.qpid.server.store.berkeleydb.Committer;
@@ -160,6 +163,7 @@ public class ReplicatedEnvironmentFacade
     private volatile SyncPolicy 
_messageStoreRemoteTransactionSyncronizationPolicy = 
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
 
     private final ConcurrentHashMap<String, Database> _cachedDatabases = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences 
= new ConcurrentHashMap<>();
 
     public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration 
configuration)
     {
@@ -242,6 +246,7 @@ public class ReplicatedEnvironmentFacade
 
                 try
                 {
+                    closeSequences();
                     closeDatabases();
                 }
                 finally
@@ -377,6 +382,38 @@ public class ReplicatedEnvironmentFacade
             }
         }
     }
+    @Override
+    public Sequence openSequence(final Database database,
+                                 final DatabaseEntry sequenceKey,
+                                 final SequenceConfig sequenceConfig)
+    {
+        Sequence cachedSequence = _cachedSequences.get(sequenceKey);
+        if (cachedSequence == null)
+        {
+            Sequence handle = database.openSequence(null, sequenceKey, 
sequenceConfig);
+            Sequence existingHandle = 
_cachedSequences.putIfAbsent(sequenceKey, handle);
+            if (existingHandle == null)
+            {
+                cachedSequence = handle;
+            }
+            else
+            {
+                cachedSequence = existingHandle;
+                handle.close();
+            }
+        }
+        return cachedSequence;
+    }
+
+
+    private void closeSequence(final DatabaseEntry sequenceKey)
+    {
+        Sequence cachedHandle = _cachedSequences.remove(sequenceKey);
+        if (cachedHandle != null)
+        {
+            cachedHandle.close();
+        }
+    }
 
     @Override
     public String getStoreLocation()
@@ -676,11 +713,6 @@ public class ReplicatedEnvironmentFacade
         createReplicationGroupAdmin().removeMember(nodeName);
     }
 
-    public void updateAddress(final String nodeName, final String newHostName, 
final int newPort)
-    {
-        createReplicationGroupAdmin().updateAddress(nodeName, newHostName, 
newPort);
-    }
-
     public long getJoinTime()
     {
         return _joinTime;
@@ -782,7 +814,7 @@ public class ReplicatedEnvironmentFacade
                         @Override
                         public void stateChange(StateChangeEvent 
stateChangeEvent) throws RuntimeException
                         {
-                            if (LOGGER.isInfoEnabled())
+                            if (LOGGER.isDebugEnabled())
                             {
                                 LOGGER.debug(
                                         "When restarting a state change event 
is received on NOOP listener for state:"
@@ -794,6 +826,7 @@ public class ReplicatedEnvironmentFacade
 
                 try
                 {
+                    closeSequences();
                     closeDatabases();
                 }
                 catch(Exception e)
@@ -810,6 +843,29 @@ public class ReplicatedEnvironmentFacade
         }
     }
 
+    private void closeSequences()
+    {
+        RuntimeException firstThrownException = null;
+        for (DatabaseEntry  sequenceKey : _cachedSequences.keySet())
+        {
+            try
+            {
+                closeSequence(sequenceKey);
+            }
+            catch(DatabaseException de)
+            {
+                if (firstThrownException == null)
+                {
+                    firstThrownException = de;
+                }
+            }
+        }
+        if (firstThrownException != null)
+        {
+            throw firstThrownException;
+        }
+    }
+
     private void closeDatabases()
     {
         RuntimeException firstThrownException = null;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to