Author: kwall
Date: Mon Jun 9 11:55:32 2014
New Revision: 1601351
URL: http://svn.apache.org/r1601351
Log:
QPID-5801: [Java Broker] BDB: Cache the sequence handler used for message
sequence number
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java?rev=1601351&r1=1601350&r2=1601351&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
Mon Jun 9 11:55:32 2014
@@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -130,8 +129,6 @@ public class BDBConfigurationStore imple
private volatile Committer _committer;
- private boolean _isMessageStoreProvider;
-
private String _storeLocation;
private final BDBMessageStore _messageStoreFacade = new BDBMessageStore();
private ConfiguredObject<?> _parent;
@@ -241,8 +238,8 @@ public class BDBConfigurationStore imple
for (ConfiguredObjectRecord record : configuredObjects.values())
{
- boolean shoudlContinue = handler.handle(record);
- if (!shoudlContinue)
+ boolean shouldContinue = handler.handle(record);
+ if (!shouldContinue)
{
break;
}
@@ -670,27 +667,18 @@ public class BDBConfigurationStore imple
public <T extends StorableMessageMetaData> StoredMessage<T>
addMessage(T metaData)
{
- Sequence mmdSeq = null;
- try
- {
- mmdSeq = getMessageMetaDataSeqDb().openSequence(null,
MESSAGE_METADATA_SEQ_KEY, MESSAGE_METADATA_SEQ_CONFIG);
- long newMessageId = mmdSeq.get(null, 1);
+ Sequence mmdSeq =
_environmentFacade.openSequence(getMessageMetaDataSeqDb(),
+
MESSAGE_METADATA_SEQ_KEY,
+
MESSAGE_METADATA_SEQ_CONFIG);
+ long newMessageId = mmdSeq.get(null, 1);
- if (metaData.isPersistent())
- {
- return (StoredMessage<T>) new
StoredBDBMessage(newMessageId, metaData);
- }
- else
- {
- return new StoredMemoryMessage<T>(newMessageId, metaData);
- }
+ if (metaData.isPersistent())
+ {
+ return (StoredMessage<T>) new StoredBDBMessage(newMessageId,
metaData);
}
- finally
+ else
{
- if (mmdSeq != null)
- {
- mmdSeq.close();
- }
+ return new StoredMemoryMessage<T>(newMessageId, metaData);
}
}
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1601351&r1=1601350&r2=1601351&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
Mon Jun 9 11:55:32 2014
@@ -26,9 +26,12 @@ import java.util.Map;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
public interface EnvironmentFacade
@@ -43,7 +46,9 @@ public interface EnvironmentFacade
Environment getEnvironment();
- Database openDatabase(String name, DatabaseConfig databaseConfig);
+ Database openDatabase(String databaseName, DatabaseConfig databaseConfig);
+
+ Sequence openSequence(Database database, DatabaseEntry sequenceKey,
SequenceConfig sequenceConfig);
Committer createCommitter(String name);
@@ -57,4 +62,5 @@ public interface EnvironmentFacade
void closeDatabase(String name);
void close();
+
}
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1601351&r1=1601350&r2=1601351&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
Mon Jun 9 11:55:32 2014
@@ -21,9 +21,14 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
+import java.net.ServerSocket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
+import com.sun.org.apache.xerces.internal.dom.DeepNodeListImpl;
import org.apache.log4j.Logger;
import com.sleepycat.je.Database;
@@ -40,6 +45,7 @@ public class StandardEnvironmentFacade i
private final String _storePath;
private final ConcurrentHashMap<String, Database> _cachedDatabases = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences
= new ConcurrentHashMap<>();
private Environment _environment;
@@ -105,20 +111,44 @@ public class StandardEnvironmentFacade i
@Override
public void close()
{
+ closeSequences();
closeDatabases();
closeEnvironment();
}
+ private void closeSequences()
+ {
+ RuntimeException firstThrownException = null;
+ for (DatabaseEntry sequenceKey : _cachedSequences.keySet())
+ {
+ try
+ {
+ closeSequence(sequenceKey);
+ }
+ catch(DatabaseException de)
+ {
+ if (firstThrownException == null)
+ {
+ firstThrownException = de;
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
private void closeDatabases()
{
RuntimeException firstThrownException = null;
- for (Database database : _cachedDatabases.values())
+ for (String databaseName : _cachedDatabases.keySet())
{
try
{
- database.close();
+ closeDatabase(databaseName);
}
- catch(RuntimeException e)
+ catch(DatabaseException e)
{
if (firstThrownException == null)
{
@@ -223,6 +253,39 @@ public class StandardEnvironmentFacade i
}
@Override
+ public Sequence openSequence(final Database database,
+ final DatabaseEntry sequenceKey,
+ final SequenceConfig sequenceConfig)
+ {
+ Sequence cachedSequence = _cachedSequences.get(sequenceKey);
+ if (cachedSequence == null)
+ {
+ Sequence handle = database.openSequence(null, sequenceKey,
sequenceConfig);
+ Sequence existingHandle =
_cachedSequences.putIfAbsent(sequenceKey, handle);
+ if (existingHandle == null)
+ {
+ cachedSequence = handle;
+ }
+ else
+ {
+ cachedSequence = existingHandle;
+ handle.close();
+ }
+ }
+ return cachedSequence;
+ }
+
+
+ private void closeSequence(final DatabaseEntry sequenceKey)
+ {
+ Sequence cachedHandle = _cachedSequences.remove(sequenceKey);
+ if (cachedHandle != null)
+ {
+ cachedHandle.close();
+ }
+ }
+
+ @Override
public void closeDatabase(final String name)
{
Database cachedHandle = _cachedDatabases.remove(name);
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1601351&r1=1601350&r2=1601351&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
Mon Jun 9 11:55:32 2014
@@ -42,6 +42,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.Committer;
@@ -160,6 +163,7 @@ public class ReplicatedEnvironmentFacade
private volatile SyncPolicy
_messageStoreRemoteTransactionSyncronizationPolicy =
REMOTE_TRANSACTION_SYNCHRONIZATION_POLICY;
private final ConcurrentHashMap<String, Database> _cachedDatabases = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<DatabaseEntry, Sequence> _cachedSequences
= new ConcurrentHashMap<>();
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration
configuration)
{
@@ -242,6 +246,7 @@ public class ReplicatedEnvironmentFacade
try
{
+ closeSequences();
closeDatabases();
}
finally
@@ -377,6 +382,38 @@ public class ReplicatedEnvironmentFacade
}
}
}
+ @Override
+ public Sequence openSequence(final Database database,
+ final DatabaseEntry sequenceKey,
+ final SequenceConfig sequenceConfig)
+ {
+ Sequence cachedSequence = _cachedSequences.get(sequenceKey);
+ if (cachedSequence == null)
+ {
+ Sequence handle = database.openSequence(null, sequenceKey,
sequenceConfig);
+ Sequence existingHandle =
_cachedSequences.putIfAbsent(sequenceKey, handle);
+ if (existingHandle == null)
+ {
+ cachedSequence = handle;
+ }
+ else
+ {
+ cachedSequence = existingHandle;
+ handle.close();
+ }
+ }
+ return cachedSequence;
+ }
+
+
+ private void closeSequence(final DatabaseEntry sequenceKey)
+ {
+ Sequence cachedHandle = _cachedSequences.remove(sequenceKey);
+ if (cachedHandle != null)
+ {
+ cachedHandle.close();
+ }
+ }
@Override
public String getStoreLocation()
@@ -676,11 +713,6 @@ public class ReplicatedEnvironmentFacade
createReplicationGroupAdmin().removeMember(nodeName);
}
- public void updateAddress(final String nodeName, final String newHostName,
final int newPort)
- {
- createReplicationGroupAdmin().updateAddress(nodeName, newHostName,
newPort);
- }
-
public long getJoinTime()
{
return _joinTime;
@@ -782,7 +814,7 @@ public class ReplicatedEnvironmentFacade
@Override
public void stateChange(StateChangeEvent
stateChangeEvent) throws RuntimeException
{
- if (LOGGER.isInfoEnabled())
+ if (LOGGER.isDebugEnabled())
{
LOGGER.debug(
"When restarting a state change event
is received on NOOP listener for state:"
@@ -794,6 +826,7 @@ public class ReplicatedEnvironmentFacade
try
{
+ closeSequences();
closeDatabases();
}
catch(Exception e)
@@ -810,6 +843,29 @@ public class ReplicatedEnvironmentFacade
}
}
+ private void closeSequences()
+ {
+ RuntimeException firstThrownException = null;
+ for (DatabaseEntry sequenceKey : _cachedSequences.keySet())
+ {
+ try
+ {
+ closeSequence(sequenceKey);
+ }
+ catch(DatabaseException de)
+ {
+ if (firstThrownException == null)
+ {
+ firstThrownException = de;
+ }
+ }
+ }
+ if (firstThrownException != null)
+ {
+ throw firstThrownException;
+ }
+ }
+
private void closeDatabases()
{
RuntimeException firstThrownException = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]