Author: rgodfrey
Date: Sat Jan 21 19:58:14 2012
New Revision: 1234410
URL: http://svn.apache.org/viewvc?rev=1234410&view=rev
Log:
QPID-3774 : Work around Java BDB issue with cursors and flushLog
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1234410&r1=1234409&r2=1234410&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Sat Jan 21 19:58:14 2012
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
import com.sleepycat.bind.tuple.StringBinding;
import com.sleepycat.je.*;
@@ -899,7 +900,6 @@ public class BDBMessageStore implements
boolean complete = false;
com.sleepycat.je.Transaction tx = null;
- Cursor cursor = null;
Random rand = null;
int attempts = 0;
try
@@ -907,7 +907,6 @@ public class BDBMessageStore implements
do
{
tx = null;
- cursor = null;
try
{
tx = _environment.beginTransaction(null, null);
@@ -936,76 +935,43 @@ public class BDBMessageStore implements
//now remove the content data from the store if there is
any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new
MessageContentKey_5(messageId,0);
-
- TupleBinding<MessageContentKey> contentKeyTupleBinding =
new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- //Use a partial record for the value to prevent retrieving
the
- //data itself as we only need the key to identify what to
remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 0, true);
- cursor = _messageContentDb.openCursor(tx, null);
-
- status = cursor.getSearchKeyRange(contentKeyEntry, value,
LockMode.RMW);
- while (status == OperationStatus.SUCCESS)
+ int offset = 0;
+ do
{
- mck = (MessageContentKey_5)
contentKeyTupleBinding.entryToObject(contentKeyEntry);
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_5 mck = new
MessageContentKey_5(messageId,offset);
+ TupleBinding<MessageContentKey> contentKeyTupleBinding
= new MessageContentKeyTB_5();
+ contentKeyTupleBinding.objectToEntry(mck,
contentKeyEntry);
+ //Use a partial record for the value to prevent
retrieving the
+ //data itself as we only need the key to identify what
to remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 4, true);
- if(mck.getMessageId() != messageId)
- {
- //we have exhausted all chunks for this message
id, break
- break;
- }
- else
- {
- status = cursor.delete();
-
- if(status == OperationStatus.NOTFOUND)
- {
- cursor.close();
- cursor = null;
+ status = _messageContentDb.get(null,contentKeyEntry,
value, LockMode.READ_COMMITTED);
- tx.abort();
- throw new AMQStoreException("Content chunk
offset" + mck.getOffset() + " not found for message " + messageId);
- }
+ if(status == OperationStatus.SUCCESS)
+ {
+ offset += IntegerBinding.entryToInt(value);
+ _messageContentDb.delete(tx, contentKeyEntry);
if (_log.isDebugEnabled())
{
_log.debug("Deleted content chunk offset " +
mck.getOffset() + " for message " + messageId);
}
}
-
- status = cursor.getNext(contentKeyEntry, value,
LockMode.RMW);
}
-
- cursor.close();
-
- cursor = null;
+ while (status == OperationStatus.SUCCESS);
commit(tx, sync);
complete = true;
+ tx = null;
}
catch (LockConflictException e)
{
try
{
- if(cursor != null)
- {
- cursor.close();
- }
- }
- catch(DatabaseException e1)
- {
- _log.warn("Unable to close cursor after
LockConflictException", e1);
- // rethrow the original log conflict exception, the
secondary exception should already have
- // been logged.
- throw e;
- }
- try
- {
if(tx != null)
{
tx.abort();
@@ -1056,13 +1022,8 @@ public class BDBMessageStore implements
{
try
{
- if(cursor != null)
- {
- cursor.close();
- cursor = null;
- }
-
tx.abort();
+ tx = null;
}
catch (DatabaseException e1)
{
@@ -1074,15 +1035,16 @@ public class BDBMessageStore implements
}
finally
{
- if(cursor != null)
+ if (tx != null)
{
try
{
- cursor.close();
+ tx.abort();
+ tx = null;
}
- catch (DatabaseException e)
+ catch (DatabaseException e1)
{
- throw new AMQStoreException("Error closing cursor: " +
e.getMessage(), e);
+ throw new AMQStoreException("Error aborting transaction "
+ e1, e1);
}
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]