Author: kwall
Date: Thu Jun 19 12:07:14 2014
New Revision: 1603849

URL: http://svn.apache.org/r1603849
Log:
QPID-5800: [Java Broker] Refactor BDB message store implementation to separate 
message and config store implementations.

* Message store implementations can now be used in isolation, which is useful 
when the user is using a JSON VirtualHostNode with
  a BDB virtual host.

Added:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBUtils.java
Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBConfigurationStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java
    
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java

Added: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1603849&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 (added)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 Thu Jun 19 12:07:14 2014
@@ -0,0 +1,1219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.*;
+import static 
org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely;
+import static 
org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely;
+import static 
org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
+
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Sequence;
+import com.sleepycat.je.SequenceConfig;
+import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.EventManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMemoryMessage;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.Xid;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
+import 
org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+
+
+public abstract class AbstractBDBMessageStore implements MessageStore
+{
+
+    private static final int LOCK_RETRY_ATTEMPTS = 5;
+
+    private static final String MESSAGE_META_DATA_DB_NAME = "MESSAGE_METADATA";
+    private static final String MESSAGE_META_DATA_SEQ_DB_NAME = 
"MESSAGE_METADATA.SEQ";
+    private static final String MESSAGE_CONTENT_DB_NAME = "MESSAGE_CONTENT";
+    private static final String DELIVERY_DB_NAME = "QUEUE_ENTRIES";
+
+    //TODO: Add upgrader to remove BRIDGES and LINKS
+    private static final String BRIDGEDB_NAME = "BRIDGES";
+    private static final String LINKDB_NAME = "LINKS";
+    private static final String XID_DB_NAME = "XIDS";
+
+    private final EventManager _eventManager = new EventManager();
+
+    private final DatabaseEntry MESSAGE_METADATA_SEQ_KEY = new 
DatabaseEntry("MESSAGE_METADATA_SEQ_KEY".getBytes(
+            Charset.forName("UTF-8")));
+
+    private final SequenceConfig MESSAGE_METADATA_SEQ_CONFIG = 
SequenceConfig.DEFAULT.
+            setAllowCreate(true).
+            setInitialValue(1).
+            setWrap(true).
+            setCacheSize(100000);
+
+    private boolean _limitBusted;
+    private long _totalStoreSize;
+
+    @Override
+    public void upgradeStoreStructure() throws StoreException
+    {
+        try
+        {
+            new Upgrader(getEnvironmentFacade().getEnvironment(), 
getParent()).upgradeIfNecessary();
+        }
+        catch(DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Cannot 
upgrade store", e);
+        }
+
+        // TODO this relies on the fact that the VH will call upgrade just 
before putting the VH into service.
+        _totalStoreSize = getSizeOnDisk();
+    }
+
+    @Override
+    public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T 
metaData)
+    {
+
+        long newMessageId = getNextMessageSequenceNumber();
+
+        if (metaData.isPersistent())
+        {
+            return (StoredMessage<T>) new StoredBDBMessage(newMessageId, 
metaData);
+        }
+        else
+        {
+            return new StoredMemoryMessage<T>(newMessageId, metaData);
+        }
+    }
+
+    private long getNextMessageSequenceNumber()
+    {
+        long newMessageId;
+        try
+        {
+            // The implementations of sequences mean that there is only a 
transaction
+            // after every n sequence values, where n is the 
MESSAGE_METADATA_SEQ_CONFIG.getCacheSize()
+
+            Sequence mmdSeq = 
getEnvironmentFacade().openSequence(getMessageMetaDataSeqDb(),
+                                                              
MESSAGE_METADATA_SEQ_KEY,
+                                                              
MESSAGE_METADATA_SEQ_CONFIG);
+            newMessageId = mmdSeq.get(null, 1);
+        }
+        catch (DatabaseException de)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Cannot get 
sequence value for new message", de);
+        }
+        return newMessageId;
+    }
+
+    @Override
+    public boolean isPersistent()
+    {
+        return true;
+    }
+
+    @Override
+    public org.apache.qpid.server.store.Transaction newTransaction()
+    {
+        checkMessageStoreOpen();
+
+        return new BDBTransaction();
+    }
+
+    @Override
+    public void addEventListener(final EventListener eventListener, final 
Event... events)
+    {
+        _eventManager.addEventListener(eventListener, events);
+    }
+
+    @Override
+    public void visitMessages(final MessageHandler handler) throws 
StoreException
+    {
+        checkMessageStoreOpen();
+        visitMessagesInternal(handler, getEnvironmentFacade());
+    }
+
+    @Override
+    public void visitMessageInstances(final MessageInstanceHandler handler) 
throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Cursor cursor = null;
+        List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+        try
+        {
+            cursor = getDeliveryDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+
+            DatabaseEntry value = new DatabaseEntry();
+            while (cursor.getNext(key, value, LockMode.DEFAULT) == 
OperationStatus.SUCCESS)
+            {
+                QueueEntryKey entry = keyBinding.entryToObject(key);
+                entries.add(entry);
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Cannot visit 
message instances", e);
+        }
+        finally
+        {
+            closeCursorSafely(cursor, getEnvironmentFacade());
+        }
+
+        for(QueueEntryKey entry : entries)
+        {
+            UUID queueId = entry.getQueueId();
+            long messageId = entry.getMessageId();
+            if (!handler.handle(queueId, messageId))
+            {
+                break;
+            }
+        }
+
+    }
+
+    @Override
+    public void visitDistributedTransactions(final 
DistributedTransactionHandler handler) throws StoreException
+    {
+        checkMessageStoreOpen();
+
+        Cursor cursor = null;
+        try
+        {
+            cursor = getXidDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            XidBinding keyBinding = XidBinding.getInstance();
+            PreparedTransactionBinding valueBinding = new 
PreparedTransactionBinding();
+            DatabaseEntry value = new DatabaseEntry();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == 
OperationStatus.SUCCESS)
+            {
+                Xid xid = keyBinding.entryToObject(key);
+                PreparedTransaction preparedTransaction = 
valueBinding.entryToObject(value);
+                if (!handler.handle(xid.getFormat(), xid.getGlobalId(), 
xid.getBranchId(),
+                                    preparedTransaction.getEnqueues(), 
preparedTransaction.getDequeues()))
+                {
+                    break;
+                }
+            }
+
+        }
+        catch (DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Cannot 
recover distributed transactions", e);
+        }
+        finally
+        {
+            closeCursorSafely(cursor, getEnvironmentFacade());
+        }
+    }
+
+    /**
+     * Retrieves message meta-data.
+     *
+     * @param messageId The message to get the meta-data for.
+     *
+     * @return The message meta data.
+     *
+     * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason, or if the specified message does not exist.
+     */
+    StorableMessageMetaData getMessageMetaData(long messageId) throws 
StoreException
+    {
+        if (getLogger().isDebugEnabled())
+        {
+            getLogger().debug("public MessageMetaData getMessageMetaData(Long 
messageId = "
+                              + messageId + "): called");
+        }
+
+        DatabaseEntry key = new DatabaseEntry();
+        LongBinding.longToEntry(messageId, key);
+        DatabaseEntry value = new DatabaseEntry();
+        MessageMetaDataBinding messageBinding = 
MessageMetaDataBinding.getInstance();
+
+        try
+        {
+            OperationStatus status = getMessageMetaDataDb().get(null, key, 
value, LockMode.READ_UNCOMMITTED);
+            if (status != OperationStatus.SUCCESS)
+            {
+                throw new StoreException("Metadata not found for message with 
id " + messageId);
+            }
+
+            StorableMessageMetaData mdd = messageBinding.entryToObject(value);
+
+            return mdd;
+        }
+        catch (DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Error 
reading message metadata for message with id "
+                                                                 + messageId
+                                                                 + ": "
+                                                                 + 
e.getMessage(), e);
+        }
+    }
+
+    void removeMessage(long messageId, boolean sync) throws StoreException
+    {
+        boolean complete = false;
+        Transaction tx = null;
+
+        Random rand = null;
+        int attempts = 0;
+        try
+        {
+            do
+            {
+                tx = null;
+                try
+                {
+                    tx = 
getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
+
+                    //remove the message meta data from the store
+                    DatabaseEntry key = new DatabaseEntry();
+                    LongBinding.longToEntry(messageId, key);
+
+                    if (getLogger().isDebugEnabled())
+                    {
+                        getLogger().debug("Removing message id " + messageId);
+                    }
+
+
+                    OperationStatus status = getMessageMetaDataDb().delete(tx, 
key);
+                    if (status == OperationStatus.NOTFOUND)
+                    {
+                        getLogger().info(
+                                "Message not found (attempt to remove failed - 
probably application initiated rollback) "
+                                +
+                                messageId);
+                    }
+
+                    if (getLogger().isDebugEnabled())
+                    {
+                        getLogger().debug("Deleted metadata for message " + 
messageId);
+                    }
+
+                    //now remove the content data from the store if there is 
any.
+                    DatabaseEntry contentKeyEntry = new DatabaseEntry();
+                    LongBinding.longToEntry(messageId, contentKeyEntry);
+                    getMessageContentDb().delete(tx, contentKeyEntry);
+
+                    if (getLogger().isDebugEnabled())
+                    {
+                        getLogger().debug("Deleted content for message " + 
messageId);
+                    }
+
+                    getEnvironmentFacade().commit(tx, sync);
+
+                    complete = true;
+                    tx = null;
+                }
+                catch (LockConflictException e)
+                {
+                    try
+                    {
+                        if(tx != null)
+                        {
+                            tx.abort();
+                        }
+                    }
+                    catch(DatabaseException e2)
+                    {
+                        getLogger().warn(
+                                "Unable to abort transaction after 
LockConflictExcption on removal of message with id "
+                                + messageId,
+                                e2);
+                        // rethrow the original log conflict exception, the 
secondary exception should already have
+                        // been logged.
+                        throw 
getEnvironmentFacade().handleDatabaseException("Cannot remove message with id "
+                                                                             + 
messageId, e);
+                    }
+
+
+                    getLogger().warn("Lock timeout exception. Retrying 
(attempt "
+                                     + (attempts + 1) + " of " + 
LOCK_RETRY_ATTEMPTS + ") " + e);
+
+                    if(++attempts < LOCK_RETRY_ATTEMPTS)
+                    {
+                        if(rand == null)
+                        {
+                            rand = new Random();
+                        }
+
+                        try
+                        {
+                            Thread.sleep(500l + (long)(500l * 
rand.nextDouble()));
+                        }
+                        catch (InterruptedException e1)
+                        {
+
+                        }
+                    }
+                    else
+                    {
+                        // rethrow the lock conflict exception since we could 
not solve by retrying
+                        throw 
getEnvironmentFacade().handleDatabaseException("Cannot remove messages", e);
+                    }
+                }
+            }
+            while(!complete);
+        }
+        catch (DatabaseException e)
+        {
+            getLogger().error("Unexpected BDB exception", e);
+
+            try
+            {
+                abortTransactionSafely(tx,
+                                       getEnvironmentFacade());
+            }
+            finally
+            {
+                tx = null;
+            }
+
+            throw getEnvironmentFacade().handleDatabaseException("Error 
removing message with id "
+                                                                 + messageId
+                                                                 + " from 
database: "
+                                                                 + 
e.getMessage(), e);
+        }
+        finally
+        {
+            try
+            {
+                abortTransactionSafely(tx,
+                                       getEnvironmentFacade());
+            }
+            finally
+            {
+                tx = null;
+            }
+        }
+    }
+
+
+    /**
+     * Fills the provided ByteBuffer with as much content for the specified 
message as possible, starting
+     * from the specified offset in the message.
+     *
+     * @param messageId The message to get the data for.
+     * @param offset    The offset of the data within the message.
+     * @param dst       The destination of the content read back
+     *
+     * @return The number of bytes inserted into the destination
+     *
+     * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason, or if the specified message does not exist.
+     */
+    int getContent(long messageId, int offset, ByteBuffer dst) throws 
StoreException
+    {
+        DatabaseEntry contentKeyEntry = new DatabaseEntry();
+        LongBinding.longToEntry(messageId, contentKeyEntry);
+        DatabaseEntry value = new DatabaseEntry();
+        ContentBinding contentTupleBinding = ContentBinding.getInstance();
+
+
+        if (getLogger().isDebugEnabled())
+        {
+            getLogger().debug("Message Id: " + messageId + " Getting content 
body from offset: " + offset);
+        }
+
+        try
+        {
+
+            int written = 0;
+            OperationStatus status = getMessageContentDb().get(null, 
contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+            if (status == OperationStatus.SUCCESS)
+            {
+                byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
+                int size = dataAsBytes.length;
+                if (offset > size)
+                {
+                    throw new RuntimeException("Offset " + offset + " is 
greater than message size " + size
+                                               + " for message id " + 
messageId + "!");
+
+                }
+
+                written = size - offset;
+                if(written > dst.remaining())
+                {
+                    written = dst.remaining();
+                }
+
+                dst.put(dataAsBytes, offset, written);
+            }
+            return written;
+        }
+        catch (DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Error 
getting AMQMessage with id "
+                                                                 + messageId
+                                                                 + " to 
database: "
+                                                                 + 
e.getMessage(), e);
+        }
+    }
+
+    private void visitMessagesInternal(MessageHandler handler, 
EnvironmentFacade environmentFacade)
+    {
+        Cursor cursor = null;
+        try
+        {
+            cursor = getMessageMetaDataDb().openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            MessageMetaDataBinding valueBinding = 
MessageMetaDataBinding.getInstance();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == 
OperationStatus.SUCCESS)
+            {
+                long messageId = LongBinding.entryToLong(key);
+                StorableMessageMetaData metaData = 
valueBinding.entryToObject(value);
+                StoredBDBMessage message = new StoredBDBMessage(messageId, 
metaData, true);
+
+                if (!handler.handle(message))
+                {
+                    break;
+                }
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw environmentFacade.handleDatabaseException("Cannot visit 
messages", e);
+        }
+        finally
+        {
+            if (cursor != null)
+            {
+                try
+                {
+                    cursor.close();
+                }
+                catch(DatabaseException e)
+                {
+                    throw environmentFacade.handleDatabaseException("Cannot 
close cursor", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Stores a chunk of message data.
+     *
+     * @param tx         The transaction for the operation.
+     * @param messageId       The message to store the data for.
+     * @param offset          The offset of the data chunk in the message.
+     * @param contentBody     The content of the data chunk.
+     *
+     * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason, or if the specified message does not exist.
+     */
+    private void addContent(final Transaction tx, long messageId, int offset,
+                            ByteBuffer contentBody) throws StoreException
+    {
+        DatabaseEntry key = new DatabaseEntry();
+        LongBinding.longToEntry(messageId, key);
+        DatabaseEntry value = new DatabaseEntry();
+        ContentBinding messageBinding = ContentBinding.getInstance();
+        messageBinding.objectToEntry(contentBody.array(), value);
+        try
+        {
+            OperationStatus status = getMessageContentDb().put(tx, key, value);
+            if (status != OperationStatus.SUCCESS)
+            {
+                throw new StoreException("Error adding content for message id 
" + messageId + ": " + status);
+            }
+
+            if (getLogger().isDebugEnabled())
+            {
+                getLogger().debug("Storing content for message " + messageId + 
" in transaction " + tx);
+
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Error 
writing AMQMessage with id "
+                                                                 + messageId
+                                                                 + " to 
database: "
+                                                                 + 
e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Stores message meta-data.
+     *
+     * @param tx         The transaction for the operation.
+     * @param messageId       The message to store the data for.
+     * @param messageMetaData The message meta data to store.
+     *
+     * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason, or if the specified message does not exist.
+     */
+    private void storeMetaData(final Transaction tx, long messageId,
+                               StorableMessageMetaData messageMetaData)
+            throws StoreException
+    {
+        if (getLogger().isDebugEnabled())
+        {
+            getLogger().debug("storeMetaData called for transaction " + tx
+                              + ", messageId " + messageId
+                              + ", messageMetaData " + messageMetaData);
+        }
+
+        DatabaseEntry key = new DatabaseEntry();
+        LongBinding.longToEntry(messageId, key);
+        DatabaseEntry value = new DatabaseEntry();
+
+        MessageMetaDataBinding messageBinding = 
MessageMetaDataBinding.getInstance();
+        messageBinding.objectToEntry(messageMetaData, value);
+        try
+        {
+            getMessageMetaDataDb().put(tx, key, value);
+            if (getLogger().isDebugEnabled())
+            {
+                getLogger().debug("Storing message metadata for message id " + 
messageId + " in transaction " + tx);
+            }
+        }
+        catch (DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Error 
writing message metadata with id "
+                                                                 + messageId
+                                                                 + " to 
database: "
+                                                                 + 
e.getMessage(), e);
+        }
+    }
+
+
+    /**
+     * Places a message onto a specified queue, in a given transaction.
+     *
+     * @param tx   The transaction for the operation.
+     * @param queue     The the queue to place the message on.
+     * @param messageId The message to enqueue.
+     *
+     * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason.
+     */
+    private void enqueueMessage(final Transaction tx, final 
TransactionLogResource queue,
+                                long messageId) throws StoreException
+    {
+
+        DatabaseEntry key = new DatabaseEntry();
+        QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+        QueueEntryKey dd = new QueueEntryKey(queue.getId(), messageId);
+        keyBinding.objectToEntry(dd, key);
+        DatabaseEntry value = new DatabaseEntry();
+        ByteBinding.byteToEntry((byte) 0, value);
+
+        try
+        {
+            if (getLogger().isDebugEnabled())
+            {
+                getLogger().debug("Enqueuing message " + messageId + " on 
queue "
+                                  + queue.getName() + " with id " + 
queue.getId() + " in transaction " + tx);
+            }
+            getDeliveryDb().put(tx, key, value);
+        }
+        catch (DatabaseException e)
+        {
+            getLogger().error("Failed to enqueue: " + e.getMessage(), e);
+            throw getEnvironmentFacade().handleDatabaseException("Error 
writing enqueued message with id "
+                                                                 + messageId
+                                                                 + " for queue 
"
+                                                                 + 
queue.getName()
+                                                                 + " with id "
+                                                                 + 
queue.getId()
+                                                                 + " to 
database", e);
+        }
+    }
+
+    /**
+     * Extracts a message from a specified queue, in a given transaction.
+     *
+     * @param tx   The transaction for the operation.
+     * @param queue     The queue to take the message from.
+     * @param messageId The message to dequeue.
+     *
+     * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason, or if the specified message does not exist.
+     */
+    private void dequeueMessage(final Transaction tx, final 
TransactionLogResource queue,
+                                long messageId) throws StoreException
+    {
+
+        DatabaseEntry key = new DatabaseEntry();
+        QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+        QueueEntryKey queueEntryKey = new QueueEntryKey(queue.getId(), 
messageId);
+        UUID id = queue.getId();
+        keyBinding.objectToEntry(queueEntryKey, key);
+        if (getLogger().isDebugEnabled())
+        {
+            getLogger().debug("Dequeue message id " + messageId + " from queue 
"
+                              + queue.getName() + " with id " + id);
+        }
+
+        try
+        {
+
+            OperationStatus status = getDeliveryDb().delete(tx, key);
+            if (status == OperationStatus.NOTFOUND)
+            {
+                throw new StoreException("Unable to find message with id " + 
messageId + " on queue "
+                                         + queue.getName() + " with id "  + 
id);
+            }
+            else if (status != OperationStatus.SUCCESS)
+            {
+                throw new StoreException("Unable to remove message with id " + 
messageId + " on queue"
+                                         + queue.getName() + " with id " + id);
+            }
+
+            if (getLogger().isDebugEnabled())
+            {
+                getLogger().debug("Removed message " + messageId + " on queue "
+                                  + queue.getName() + " with id " + id);
+
+            }
+        }
+        catch (DatabaseException e)
+        {
+
+            getLogger().error("Failed to dequeue message " + messageId + " in 
transaction " + tx, e);
+
+            throw getEnvironmentFacade().handleDatabaseException("Error 
accessing database while dequeuing message: "
+                                                                 + 
e.getMessage(), e);
+        }
+    }
+
+    private void recordXid(Transaction txn,
+                           long format,
+                           byte[] globalId,
+                           byte[] branchId,
+                           org.apache.qpid.server.store.Transaction.Record[] 
enqueues,
+                           org.apache.qpid.server.store.Transaction.Record[] 
dequeues) throws StoreException
+    {
+        DatabaseEntry key = new DatabaseEntry();
+        Xid xid = new Xid(format, globalId, branchId);
+        XidBinding keyBinding = XidBinding.getInstance();
+        keyBinding.objectToEntry(xid,key);
+
+        DatabaseEntry value = new DatabaseEntry();
+        PreparedTransaction preparedTransaction = new 
PreparedTransaction(enqueues, dequeues);
+        PreparedTransactionBinding valueBinding = new 
PreparedTransactionBinding();
+        valueBinding.objectToEntry(preparedTransaction, value);
+
+        try
+        {
+            getXidDb().put(txn, key, value);
+        }
+        catch (DatabaseException e)
+        {
+            getLogger().error("Failed to write xid: " + e.getMessage(), e);
+            throw getEnvironmentFacade().handleDatabaseException("Error 
writing xid to database", e);
+        }
+    }
+
+    private void removeXid(Transaction txn, long format, byte[] globalId, 
byte[] branchId)
+            throws StoreException
+    {
+        DatabaseEntry key = new DatabaseEntry();
+        Xid xid = new Xid(format, globalId, branchId);
+        XidBinding keyBinding = XidBinding.getInstance();
+
+        keyBinding.objectToEntry(xid, key);
+
+
+        try
+        {
+
+            OperationStatus status = getXidDb().delete(txn, key);
+            if (status == OperationStatus.NOTFOUND)
+            {
+                throw new StoreException("Unable to find xid");
+            }
+            else if (status != OperationStatus.SUCCESS)
+            {
+                throw new StoreException("Unable to remove xid");
+            }
+
+        }
+        catch (DatabaseException e)
+        {
+
+            getLogger().error("Failed to remove xid in transaction " + txn, e);
+
+            throw getEnvironmentFacade().handleDatabaseException("Error 
accessing database while removing xid: "
+                                                                 + 
e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Commits all operations performed within a given transaction.
+     *
+     * @param tx The transaction to commit all operations for.
+     *
+     * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason.
+     */
+    private StoreFuture commitTranImpl(final Transaction tx, boolean 
syncCommit) throws StoreException
+    {
+        if (tx == null)
+        {
+            throw new StoreException("Fatal internal error: transactional is 
null at commitTran");
+        }
+
+        StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit);
+
+        if (getLogger().isDebugEnabled())
+        {
+            String transactionType = syncCommit ? "synchronous" : 
"asynchronous";
+            getLogger().debug("commitTranImpl completed " + transactionType + 
" transaction " + tx);
+        }
+
+        return result;
+    }
+
+    /**
+     * Abandons all operations performed within a given transaction.
+     *
+     * @param tx The transaction to abandon.
+     *
+     * @throws org.apache.qpid.server.store.StoreException If the operation 
fails for any reason.
+     */
+    private void abortTran(final Transaction tx) throws StoreException
+    {
+        if (getLogger().isDebugEnabled())
+        {
+            getLogger().debug("abortTran called for transaction " + tx);
+        }
+
+        try
+        {
+            tx.abort();
+        }
+        catch (DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Error 
aborting transaction: " + e.getMessage(), e);
+        }
+    }
+
+    private void storedSizeChangeOccurred(final int delta) throws 
StoreException
+    {
+        try
+        {
+            storedSizeChange(delta);
+        }
+        catch(DatabaseException e)
+        {
+            throw getEnvironmentFacade().handleDatabaseException("Stored size 
change exception", e);
+        }
+    }
+
+    private void storedSizeChange(final int delta)
+    {
+        if(getPersistentSizeHighThreshold() > 0)
+        {
+            synchronized (this)
+            {
+                // the delta supplied is an approximation of a store size 
change. we don;t want to check the statistic every
+                // time, so we do so only when there's been enough change that 
it is worth looking again. We do this by
+                // assuming the total size will change by less than twice the 
amount of the message data change.
+                long newSize = _totalStoreSize += 2*delta;
+
+                if(!_limitBusted &&  newSize > 
getPersistentSizeHighThreshold())
+                {
+                    _totalStoreSize = getSizeOnDisk();
+
+                    if(_totalStoreSize > getPersistentSizeHighThreshold())
+                    {
+                        _limitBusted = true;
+                        
_eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+                    }
+                }
+                else if(_limitBusted && newSize < 
getPersistentSizeLowThreshold())
+                {
+                    long oldSize = _totalStoreSize;
+                    _totalStoreSize = getSizeOnDisk();
+
+                    if(oldSize <= _totalStoreSize)
+                    {
+
+                        reduceSizeOnDisk();
+
+                        _totalStoreSize = getSizeOnDisk();
+
+                    }
+
+                    if(_totalStoreSize < getPersistentSizeLowThreshold())
+                    {
+                        _limitBusted = false;
+                        
_eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+                    }
+
+
+                }
+            }
+        }
+    }
+
+    private void reduceSizeOnDisk()
+    {
+        
getEnvironmentFacade().getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER,
 "false");
+        boolean cleaned = false;
+        while (getEnvironmentFacade().getEnvironment().cleanLog() > 0)
+        {
+            cleaned = true;
+        }
+        if (cleaned)
+        {
+            CheckpointConfig force = new CheckpointConfig();
+            force.setForce(true);
+            getEnvironmentFacade().getEnvironment().checkpoint(force);
+        }
+
+
+        
getEnvironmentFacade().getEnvironment().getConfig().setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER,
 "true");
+    }
+
+    private long getSizeOnDisk()
+    {
+        return 
getEnvironmentFacade().getEnvironment().getStats(null).getTotalLogSize();
+    }
+
+    private Database getMessageContentDb()
+    {
+        return getEnvironmentFacade().openDatabase(MESSAGE_CONTENT_DB_NAME, 
DEFAULT_DATABASE_CONFIG);
+    }
+
+    private Database getMessageMetaDataDb()
+    {
+        return getEnvironmentFacade().openDatabase(MESSAGE_META_DATA_DB_NAME, 
DEFAULT_DATABASE_CONFIG);
+    }
+
+    private Database getMessageMetaDataSeqDb()
+    {
+        return 
getEnvironmentFacade().openDatabase(MESSAGE_META_DATA_SEQ_DB_NAME, 
DEFAULT_DATABASE_CONFIG);
+    }
+
+    private Database getDeliveryDb()
+    {
+        return getEnvironmentFacade().openDatabase(DELIVERY_DB_NAME, 
DEFAULT_DATABASE_CONFIG);
+    }
+
+    private Database getXidDb()
+    {
+        return getEnvironmentFacade().openDatabase(XID_DB_NAME, 
DEFAULT_DATABASE_CONFIG);
+    }
+
+    protected abstract void checkMessageStoreOpen();
+
+    protected abstract ConfiguredObject<?> getParent();
+
+    protected abstract EnvironmentFacade getEnvironmentFacade();
+
+    protected abstract long getPersistentSizeLowThreshold();
+
+    protected abstract long getPersistentSizeHighThreshold();
+
+    protected abstract Logger getLogger();
+
+    private class StoredBDBMessage<T extends StorableMessageMetaData> 
implements StoredMessage<T>
+    {
+
+        private final long _messageId;
+        private final boolean _isRecovered;
+
+        private T _metaData;
+        private volatile SoftReference<T> _metaDataRef;
+
+        private byte[] _data;
+        private volatile SoftReference<byte[]> _dataRef;
+
+        StoredBDBMessage(long messageId, T metaData)
+        {
+            this(messageId, metaData, false);
+        }
+
+        StoredBDBMessage(long messageId, T metaData, boolean isRecovered)
+        {
+            _messageId = messageId;
+            _isRecovered = isRecovered;
+
+            if(!_isRecovered)
+            {
+                _metaData = metaData;
+            }
+            _metaDataRef = new SoftReference<T>(metaData);
+        }
+
+        @Override
+        public T getMetaData()
+        {
+            T metaData = _metaDataRef.get();
+            if(metaData == null)
+            {
+                checkMessageStoreOpen();
+
+                metaData = (T) getMessageMetaData(_messageId);
+                _metaDataRef = new SoftReference<T>(metaData);
+            }
+
+            return metaData;
+        }
+
+        @Override
+        public long getMessageNumber()
+        {
+            return _messageId;
+        }
+
+        @Override
+        public void addContent(int offsetInMessage, ByteBuffer src)
+        {
+            src = src.slice();
+
+            if(_data == null)
+            {
+                _data = new byte[src.remaining()];
+                _dataRef = new SoftReference<byte[]>(_data);
+                src.duplicate().get(_data);
+            }
+            else
+            {
+                byte[] oldData = _data;
+                _data = new byte[oldData.length + src.remaining()];
+                _dataRef = new SoftReference<byte[]>(_data);
+
+                System.arraycopy(oldData, 0, _data, 0, oldData.length);
+                src.duplicate().get(_data, oldData.length, src.remaining());
+            }
+
+        }
+
+        @Override
+        public int getContent(int offsetInMessage, ByteBuffer dst)
+        {
+            byte[] data = _dataRef == null ? null : _dataRef.get();
+            if(data != null)
+            {
+                int length = Math.min(dst.remaining(), data.length - 
offsetInMessage);
+                dst.put(data, offsetInMessage, length);
+                return length;
+            }
+            else
+            {
+                checkMessageStoreOpen();
+
+                return AbstractBDBMessageStore.this.getContent(_messageId, 
offsetInMessage, dst);
+            }
+        }
+
+        @Override
+        public ByteBuffer getContent(int offsetInMessage, int size)
+        {
+            byte[] data = _dataRef == null ? null : _dataRef.get();
+            if(data != null)
+            {
+                return ByteBuffer.wrap(data,offsetInMessage,size);
+            }
+            else
+            {
+                ByteBuffer buf = ByteBuffer.allocate(size);
+                int length = getContent(offsetInMessage, buf);
+                buf.limit(length);
+                buf.position(0);
+                return  buf;
+            }
+        }
+
+        synchronized void store(Transaction txn)
+        {
+            if (!stored())
+            {
+                try
+                {
+                    _dataRef = new SoftReference<byte[]>(_data);
+                    AbstractBDBMessageStore.this.storeMetaData(txn, 
_messageId, _metaData);
+                    AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
+                                                    _data == null ? 
ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+                }
+                finally
+                {
+                    _metaData = null;
+                    _data = null;
+                }
+            }
+        }
+
+        @Override
+        public synchronized StoreFuture flushToStore()
+        {
+            if(!stored())
+            {
+                checkMessageStoreOpen();
+
+                Transaction txn;
+                try
+                {
+                    txn = 
getEnvironmentFacade().getEnvironment().beginTransaction(
+                            null, null);
+                }
+                catch (DatabaseException e)
+                {
+                    throw 
getEnvironmentFacade().handleDatabaseException("failed to begin transaction", 
e);
+                }
+                store(txn);
+                getEnvironmentFacade().commit(txn, true);
+
+                storedSizeChangeOccurred(getMetaData().getContentSize());
+            }
+            return StoreFuture.IMMEDIATE_FUTURE;
+        }
+
+        @Override
+        public void remove()
+        {
+            checkMessageStoreOpen();
+
+            int delta = getMetaData().getContentSize();
+            removeMessage(_messageId, false);
+            storedSizeChangeOccurred(-delta);
+        }
+
+        private boolean stored()
+        {
+            return _metaData == null || _isRecovered;
+        }
+
+        @Override
+        public String toString()
+        {
+            return this.getClass() + "[messageId=" + _messageId + "]";
+        }
+    }
+
+
+    private class BDBTransaction implements 
org.apache.qpid.server.store.Transaction
+    {
+        private Transaction _txn;
+        private int _storeSizeIncrease;
+
+        private BDBTransaction() throws StoreException
+        {
+            try
+            {
+                _txn = 
getEnvironmentFacade().getEnvironment().beginTransaction(null, null);
+            }
+            catch(DatabaseException e)
+            {
+                throw getEnvironmentFacade().handleDatabaseException("Cannot 
create store transaction", e);
+            }
+        }
+
+        @Override
+        public void enqueueMessage(TransactionLogResource queue, 
EnqueueableMessage message) throws StoreException
+        {
+            checkMessageStoreOpen();
+
+            if(message.getStoredMessage() instanceof StoredBDBMessage)
+            {
+                final StoredBDBMessage storedMessage = (StoredBDBMessage) 
message.getStoredMessage();
+                storedMessage.store(_txn);
+                _storeSizeIncrease += 
storedMessage.getMetaData().getContentSize();
+            }
+
+            AbstractBDBMessageStore.this.enqueueMessage(_txn, queue, 
message.getMessageNumber());
+        }
+
+        @Override
+        public void dequeueMessage(TransactionLogResource queue, 
EnqueueableMessage message) throws StoreException
+        {
+            checkMessageStoreOpen();
+
+            AbstractBDBMessageStore.this.dequeueMessage(_txn, queue, 
message.getMessageNumber());
+        }
+
+        @Override
+        public void commitTran() throws StoreException
+        {
+            checkMessageStoreOpen();
+
+            AbstractBDBMessageStore.this.commitTranImpl(_txn, true);
+            
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
+        }
+
+        @Override
+        public StoreFuture commitTranAsync() throws StoreException
+        {
+            checkMessageStoreOpen();
+
+            
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
+            return AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+        }
+
+        @Override
+        public void abortTran() throws StoreException
+        {
+            checkMessageStoreOpen();
+
+            AbstractBDBMessageStore.this.abortTran(_txn);
+        }
+
+        @Override
+        public void removeXid(long format, byte[] globalId, byte[] branchId) 
throws StoreException
+        {
+            checkMessageStoreOpen();
+
+            AbstractBDBMessageStore.this.removeXid(_txn, format, globalId, 
branchId);
+        }
+
+        @Override
+        public void recordXid(long format, byte[] globalId, byte[] branchId, 
Record[] enqueues,
+                              Record[] dequeues) throws StoreException
+        {
+            checkMessageStoreOpen();
+
+            AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, 
branchId, enqueues, dequeues);
+        }
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to