Author: rgodfrey
Date: Sat Jan 21 19:58:14 2012
New Revision: 1234410

URL: http://svn.apache.org/viewvc?rev=1234410&view=rev
Log:
QPID-3774 : Work around Java BDB issue with cursors and flushLog

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1234410&r1=1234409&r2=1234410&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
 Sat Jan 21 19:58:14 2012
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.sleepycat.bind.tuple.IntegerBinding;
 import com.sleepycat.bind.tuple.LongBinding;
 import com.sleepycat.bind.tuple.StringBinding;
 import com.sleepycat.je.*;
@@ -899,7 +900,6 @@ public class BDBMessageStore implements 
         boolean complete = false;
         com.sleepycat.je.Transaction tx = null;
 
-        Cursor cursor = null;
         Random rand = null;
         int attempts = 0;
         try
@@ -907,7 +907,6 @@ public class BDBMessageStore implements 
             do
             {
                 tx = null;
-                cursor = null;
                 try
                 {
                     tx = _environment.beginTransaction(null, null);
@@ -936,76 +935,43 @@ public class BDBMessageStore implements 
 
                     //now remove the content data from the store if there is 
any.
 
-                    DatabaseEntry contentKeyEntry = new DatabaseEntry();
-                    MessageContentKey_5 mck = new 
MessageContentKey_5(messageId,0);
-
-                    TupleBinding<MessageContentKey> contentKeyTupleBinding = 
new MessageContentKeyTB_5();
-                    contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
 
-                    //Use a partial record for the value to prevent retrieving 
the
-                    //data itself as we only need the key to identify what to 
remove.
-                    DatabaseEntry value = new DatabaseEntry();
-                    value.setPartial(0, 0, true);
 
-                    cursor = _messageContentDb.openCursor(tx, null);
-
-                    status = cursor.getSearchKeyRange(contentKeyEntry, value, 
LockMode.RMW);
-                    while (status == OperationStatus.SUCCESS)
+                    int offset = 0;
+                    do
                     {
-                        mck = (MessageContentKey_5) 
contentKeyTupleBinding.entryToObject(contentKeyEntry);
+                        DatabaseEntry contentKeyEntry = new DatabaseEntry();
+                        MessageContentKey_5 mck = new 
MessageContentKey_5(messageId,offset);
+                        TupleBinding<MessageContentKey> contentKeyTupleBinding 
= new MessageContentKeyTB_5();
+                        contentKeyTupleBinding.objectToEntry(mck, 
contentKeyEntry);
+                        //Use a partial record for the value to prevent 
retrieving the
+                        //data itself as we only need the key to identify what 
to remove.
+                        DatabaseEntry value = new DatabaseEntry();
+                        value.setPartial(0, 4, true);
 
-                        if(mck.getMessageId() != messageId)
-                        {
-                            //we have exhausted all chunks for this message 
id, break
-                            break;
-                        }
-                        else
-                        {
-                            status = cursor.delete();
-
-                            if(status == OperationStatus.NOTFOUND)
-                            {
-                                cursor.close();
-                                cursor = null;
+                        status = _messageContentDb.get(null,contentKeyEntry, 
value, LockMode.READ_COMMITTED);
 
-                                tx.abort();
-                                throw new AMQStoreException("Content chunk 
offset" + mck.getOffset() + " not found for message " + messageId);
-                            }
+                        if(status == OperationStatus.SUCCESS)
+                        {
 
+                            offset += IntegerBinding.entryToInt(value);
+                            _messageContentDb.delete(tx, contentKeyEntry);
                             if (_log.isDebugEnabled())
                             {
                                 _log.debug("Deleted content chunk offset " + 
mck.getOffset() + " for message " + messageId);
                             }
                         }
-
-                        status = cursor.getNext(contentKeyEntry, value, 
LockMode.RMW);
                     }
-
-                    cursor.close();
-
-                    cursor = null;
+                    while (status == OperationStatus.SUCCESS);
 
                     commit(tx, sync);
                     complete = true;
+                    tx = null;
                 }
                 catch (LockConflictException e)
                 {
                     try
                     {
-                        if(cursor != null)
-                        {
-                            cursor.close();
-                        }
-                    }
-                    catch(DatabaseException e1)
-                    {
-                        _log.warn("Unable to close cursor after 
LockConflictException", e1);
-                        // rethrow the original log conflict exception, the 
secondary exception should already have
-                        // been logged.
-                        throw e;
-                    }
-                    try
-                    {
                         if(tx != null)
                         {
                             tx.abort();
@@ -1056,13 +1022,8 @@ public class BDBMessageStore implements 
             {
                 try
                 {
-                    if(cursor != null)
-                    {
-                        cursor.close();
-                        cursor = null;
-                    }
-
                     tx.abort();
+                    tx = null;
                 }
                 catch (DatabaseException e1)
                 {
@@ -1074,15 +1035,16 @@ public class BDBMessageStore implements 
         }
         finally
         {
-            if(cursor != null)
+            if (tx != null)
             {
                 try
                 {
-                    cursor.close();
+                    tx.abort();
+                    tx = null;
                 }
-                catch (DatabaseException e)
+                catch (DatabaseException e1)
                 {
-                    throw new AMQStoreException("Error closing cursor: " + 
e.getMessage(), e);
+                    throw new AMQStoreException("Error aborting transaction " 
+ e1, e1);
                 }
             }
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to