Author: rgodfrey
Date: Fri Jan 20 20:27:59 2012
New Revision: 1234111
URL: http://svn.apache.org/viewvc?rev=1234111&view=rev
Log:
QPID-3774 : allow out of order completion of persistent enqueues / dequeues
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
- copied, changed from r1231491,
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Modified:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
(original)
+++
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Fri Jan 20 20:27:59 2012
@@ -30,6 +30,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -94,6 +95,8 @@ public class BDBMessageStore implements
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
+ private static final int LOCK_RETRY_ATTEMPTS = 5;
+
static final int DATABASE_FORMAT_VERSION = 5;
private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
@@ -893,91 +896,161 @@ public class BDBMessageStore implements
{
// _log.debug("public void removeMessage(Long messageId = " +
messageId): called");
-
+ boolean complete = false;
com.sleepycat.je.Transaction tx = null;
Cursor cursor = null;
+ Random rand = null;
+ int attempts = 0;
try
{
- tx = _environment.beginTransaction(null, null);
+ do
+ {
+ tx = null;
+ cursor = null;
+ try
+ {
+ tx = _environment.beginTransaction(null, null);
- //remove the message meta data from the store
- DatabaseEntry key = new DatabaseEntry();
- LongBinding.longToEntry(messageId, key);
+ //remove the message meta data from the store
+ DatabaseEntry key = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, key);
- if (_log.isDebugEnabled())
- {
- _log.debug("Removing message id " + messageId);
- }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Removing message id " + messageId);
+ }
- OperationStatus status = _messageMetaDataDb.delete(tx, key);
- if (status == OperationStatus.NOTFOUND)
- {
- _log.info("Message not found (attempt to remove failed -
probably application initiated rollback) " +
- messageId);
- }
+ OperationStatus status = _messageMetaDataDb.delete(tx,
key);
+ if (status == OperationStatus.NOTFOUND)
+ {
+ _log.info("Message not found (attempt to remove failed
- probably application initiated rollback) " +
+ messageId);
+ }
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted metadata for message " + messageId);
- }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted metadata for message " +
messageId);
+ }
- //now remove the content data from the store if there is any.
+ //now remove the content data from the store if there is
any.
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new MessageContentKey_5(messageId,0);
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ MessageContentKey_5 mck = new
MessageContentKey_5(messageId,0);
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new
MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ TupleBinding<MessageContentKey> contentKeyTupleBinding =
new MessageContentKeyTB_5();
+ contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- //Use a partial record for the value to prevent retrieving the
- //data itself as we only need the key to identify what to remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 0, true);
+ //Use a partial record for the value to prevent retrieving
the
+ //data itself as we only need the key to identify what to
remove.
+ DatabaseEntry value = new DatabaseEntry();
+ value.setPartial(0, 0, true);
- cursor = _messageContentDb.openCursor(tx, null);
+ cursor = _messageContentDb.openCursor(tx, null);
- status = cursor.getSearchKeyRange(contentKeyEntry, value,
LockMode.RMW);
- while (status == OperationStatus.SUCCESS)
- {
- mck = (MessageContentKey_5)
contentKeyTupleBinding.entryToObject(contentKeyEntry);
+ status = cursor.getSearchKeyRange(contentKeyEntry, value,
LockMode.RMW);
+ while (status == OperationStatus.SUCCESS)
+ {
+ mck = (MessageContentKey_5)
contentKeyTupleBinding.entryToObject(contentKeyEntry);
- if(mck.getMessageId() != messageId)
- {
- //we have exhausted all chunks for this message id, break
- break;
+ if(mck.getMessageId() != messageId)
+ {
+ //we have exhausted all chunks for this message
id, break
+ break;
+ }
+ else
+ {
+ status = cursor.delete();
+
+ if(status == OperationStatus.NOTFOUND)
+ {
+ cursor.close();
+ cursor = null;
+
+ tx.abort();
+ throw new AMQStoreException("Content chunk
offset" + mck.getOffset() + " not found for message " + messageId);
+ }
+
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Deleted content chunk offset " +
mck.getOffset() + " for message " + messageId);
+ }
+ }
+
+ status = cursor.getNext(contentKeyEntry, value,
LockMode.RMW);
+ }
+
+ cursor.close();
+
+ cursor = null;
+
+ commit(tx, sync);
+ complete = true;
}
- else
+ catch (LockConflictException e)
{
- status = cursor.delete();
+ try
+ {
+ if(cursor != null)
+ {
+ cursor.close();
+ }
+ }
+ catch(DatabaseException e1)
+ {
+ _log.warn("Unable to close cursor after
LockConflictException", e1);
+ // rethrow the original log conflict exception, the
secondary exception should already have
+ // been logged.
+ throw e;
+ }
+ try
+ {
+ if(tx != null)
+ {
+ tx.abort();
+ }
+ }
+ catch(DatabaseException e2)
+ {
+ _log.warn("Unable to abort transaction after
LockConflictExcption", e2);
+ // rethrow the original log conflict exception, the
secondary exception should already have
+ // been logged.
+ throw e;
+ }
+
- if(status == OperationStatus.NOTFOUND)
+ _log.warn("Lock timeout exception. Retrying (attempt "
+ + (attempts+1) + " of "+ LOCK_RETRY_ATTEMPTS +")
" + e);
+
+ if(++attempts < LOCK_RETRY_ATTEMPTS)
{
- cursor.close();
- cursor = null;
+ if(rand == null)
+ {
+ rand = new Random();
+ }
- tx.abort();
- throw new AMQStoreException("Content chunk offset" +
mck.getOffset() + " not found for message " + messageId);
- }
+ try
+ {
+ Thread.sleep(500l + (long)(500l *
rand.nextDouble()));
+ }
+ catch (InterruptedException e1)
+ {
- if (_log.isDebugEnabled())
+ }
+ }
+ else
{
- _log.debug("Deleted content chunk offset " +
mck.getOffset() + " for message " + messageId);
+ // rethrow the lock conflict exception since we could
not solve by retrying
+ throw e;
}
}
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
}
-
- cursor.close();
- cursor = null;
-
- commit(tx, sync);
+ while(!complete);
}
catch (DatabaseException e)
{
- e.printStackTrace();
+ _log.error("Unexpected BDB exception", e);
if (tx != null)
{
@@ -1009,7 +1082,7 @@ public class BDBMessageStore implements
}
catch (DatabaseException e)
{
- throw new AMQStoreException("Error closing database
connection: " + e.getMessage(), e);
+ throw new AMQStoreException("Error closing cursor: " +
e.getMessage(), e);
}
}
}
@@ -2073,7 +2146,7 @@ public class BDBMessageStore implements
{
// RHM-7 Periodically wake up and check, just in
case we
// missed a notification. Don't want to lock the
broker hard.
- _lock.wait(250);
+ _lock.wait(1000);
}
catch (InterruptedException e)
{
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
Fri Jan 20 20:27:59 2012
@@ -27,6 +27,7 @@ import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
@@ -63,7 +64,7 @@ import org.apache.qpid.server.queue.Queu
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -84,18 +85,20 @@ import org.apache.qpid.transport.Session
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerSession extends Session implements AuthorizationHolder,
SessionConfig, AMQSessionModel, LogSubject
+public class ServerSession extends Session
+ implements AuthorizationHolder, SessionConfig,
+ AMQSessionModel, LogSubject,
AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger =
LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION =
UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
+ private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
private final UUID _id;
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new
ConcurrentHashMap<AMQQueue, Boolean>();
@@ -147,7 +150,7 @@ public class ServerSession extends Sessi
{
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
- _transaction = new AutoCommitTransaction(this.getMessageStore());
+ _transaction = new
AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
@@ -184,16 +187,7 @@ public class ServerSession extends Sessi
invoke(new MessageFlow("",MessageCreditUnit.MESSAGE,
PRODUCER_CREDIT_TOPUP_THRESHOLD));
}
getConnectionModel().registerMessageReceived(message.getSize(),
message.getArrivalTime());
- PostEnqueueAction postTransactionAction;
- if(isTransactional())
- {
- postTransactionAction = new PostEnqueueAction(queues, message) ;
- }
- else
- {
- postTransactionAction = _postEnqueueAction;
- postTransactionAction.setState(queues, message);
- }
+ PostEnqueueAction postTransactionAction = new
PostEnqueueAction(queues, message, isTransactional()) ;
_transaction.enqueue(queues,message, postTransactionAction, 0L);
incrementOutstandingTxnsIfNecessary();
updateTransactionalActivity();
@@ -221,12 +215,12 @@ public class ServerSession extends Sessi
public void accept(RangeSet ranges)
{
dispositionChange(ranges, new MessageDispositionAction()
- {
- public void
performAction(MessageDispositionChangeListener listener)
- {
- listener.onAccept();
- }
- });
+ {
+ public void performAction(MessageDispositionChangeListener
listener)
+ {
+ listener.onAccept();
+ }
+ });
}
@@ -444,10 +438,7 @@ public class ServerSession extends Sessi
public boolean isTransactional()
{
- // this does not look great but there should only be one
"non-transactional"
- // transactional context, while there could be several transactional
ones in
- // theory
- return !(_transaction instanceof AutoCommitTransaction);
+ return _transaction.isTransactional();
}
public boolean inTransaction()
@@ -765,6 +756,7 @@ public class ServerSession extends Sessi
{
subscription_0_10.flushCreditState(false);
}
+ awaitCommandCompletion();
}
private class PostEnqueueAction implements ServerTransaction.Action
@@ -774,17 +766,12 @@ public class ServerSession extends Sessi
private ServerMessage _message;
private final boolean _transactional;
- public PostEnqueueAction(List<? extends BaseQueue> queues,
ServerMessage message)
+ public PostEnqueueAction(List<? extends BaseQueue> queues,
ServerMessage message, final boolean transactional)
{
- _transactional = true;
+ _transactional = transactional;
setState(queues, message);
}
- public PostEnqueueAction()
- {
- _transactional = false;
- }
-
public void setState(List<? extends BaseQueue> queues, ServerMessage
message)
{
_message = message;
@@ -830,4 +817,76 @@ public class ServerSession extends Sessi
{
return _blocking.get();
}
+
+ private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new
LinkedList<AsyncCommand>();
+
+ public void completeAsyncCommands()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.peek()) != null &&
cmd.isReadyForCompletion())
+ {
+ cmd.complete();
+ _unfinishedCommandsQueue.poll();
+ }
+ while(_unfinishedCommandsQueue.size() >
UNFINISHED_COMMAND_QUEUE_THRESHOLD)
+ {
+ cmd = _unfinishedCommandsQueue.poll();
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public void awaitCommandCompletion()
+ {
+ AsyncCommand cmd;
+ while((cmd = _unfinishedCommandsQueue.poll()) != null)
+ {
+ cmd.awaitReadyForCompletion();
+ cmd.complete();
+ }
+ }
+
+
+ public Object getAsyncCommandMark()
+ {
+ return _unfinishedCommandsQueue.isEmpty() ? null :
_unfinishedCommandsQueue.getLast();
+ }
+
+ public void recordFuture(final MessageStore.StoreFuture future, final
ServerTransaction.Action action)
+ {
+ _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
+ }
+
+ private static class AsyncCommand
+ {
+ private final MessageStore.StoreFuture _future;
+ private ServerTransaction.Action _action;
+
+ public AsyncCommand(final MessageStore.StoreFuture future, final
ServerTransaction.Action action)
+ {
+ _future = future;
+ _action = action;
+ }
+
+ void awaitReadyForCompletion()
+ {
+ _future.waitForCompletion();
+ }
+
+ void complete()
+ {
+ if(!_future.isComplete())
+ {
+ _future.waitForCompletion();
+ }
+ _action.postCommit();
+ _action = null;
+ }
+
+ boolean isReadyForCompletion()
+ {
+ return _future.isComplete();
+ }
+ }
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
Fri Jan 20 20:27:59 2012
@@ -21,7 +21,6 @@
package org.apache.qpid.server.transport;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -55,6 +54,7 @@ import org.apache.qpid.server.store.Mess
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
@@ -81,9 +81,22 @@ public class ServerSessionDelegate exten
if(!session.isClosing())
{
- super.command(session, method);
+ Object asyncCommandMark =
((ServerSession)session).getAsyncCommandMark();
+ super.command(session, method, false);
+ Object newOutstanding =
((ServerSession)session).getAsyncCommandMark();
+ if(newOutstanding == null || newOutstanding ==
asyncCommandMark)
+ {
+ session.processed(method);
+ }
+
+ if(newOutstanding != null)
+ {
+ ((ServerSession)session).completeAsyncCommands();
+ }
+
if (method.isSync())
{
+ ((ServerSession)session).awaitCommandCompletion();
session.flushProcessed();
}
}
@@ -98,7 +111,13 @@ public class ServerSessionDelegate exten
@Override
public void messageAccept(Session session, MessageAccept method)
{
- ((ServerSession)session).accept(method.getTransfers());
+ final ServerSession serverSession = (ServerSession) session;
+ serverSession.accept(method.getTransfers());
+ if(!serverSession.isTransactional())
+ {
+ serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE,
+ new
CommandProcessedAction(serverSession, method));
+ }
}
@Override
@@ -252,7 +271,7 @@ public class ServerSessionDelegate exten
}
@Override
- public void messageTransfer(Session ssn, MessageTransfer xfr)
+ public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
final Exchange exchange = getExchangeForMessage(ssn, xfr);
@@ -294,12 +313,13 @@ public class ServerSessionDelegate exten
exchangeInUse = exchange;
}
+ final ServerSession serverSession = (ServerSession) ssn;
if(!queues.isEmpty())
{
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage =
createStoreMessage(xfr, messageMetaData, store);
- MessageTransferMessage message = new
MessageTransferMessage(storeMessage, ((ServerSession)ssn).getReference());
- ((ServerSession) ssn).enqueue(message, queues);
+ MessageTransferMessage message = new
MessageTransferMessage(storeMessage, serverSession.getReference());
+ serverSession.enqueue(message, queues);
storeMessage.flushToStore();
}
else
@@ -313,13 +333,19 @@ public class ServerSessionDelegate exten
}
else
{
- ((ServerSession)
ssn).getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(),
messageMetaData.getRoutingKey()));
+
serverSession.getLogActor().message(ExchangeMessages.DISCARDMSG(exchangeInUse.getName(),
messageMetaData.getRoutingKey()));
}
}
-
- ssn.processed(xfr);
+ if(serverSession.isTransactional())
+ {
+ serverSession.processed(xfr);
+ }
+ else
+ {
+ serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new
CommandProcessedAction(serverSession, xfr));
+ }
}
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final
MessageTransfer xfr,
@@ -404,6 +430,13 @@ public class ServerSessionDelegate exten
@Override
+ public void executionSync(final Session ssn, final ExecutionSync sync)
+ {
+ ((ServerSession)ssn).awaitCommandCompletion();
+ super.executionSync(ssn, sync);
+ }
+
+ @Override
public void exchangeDeclare(Session session, ExchangeDeclare method)
{
String exchangeName = method.getExchange();
@@ -1269,4 +1302,25 @@ public class ServerSessionDelegate exten
final ServerConnection scon = (ServerConnection)
session.getConnection();
SecurityManager.setThreadSubject(scon.getAuthorizedSubject());
}
+
+ private static class CommandProcessedAction implements
ServerTransaction.Action
+ {
+ private final ServerSession _serverSession;
+ private final Method _method;
+
+ public CommandProcessedAction(final ServerSession serverSession, final
Method xfr)
+ {
+ _serverSession = serverSession;
+ _method = xfr;
+ }
+
+ public void postCommit()
+ {
+ _serverSession.processed(_method);
+ }
+
+ public void onRollback()
+ {
+ }
+ }
}
Copied:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
(from r1231491,
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java)
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java&r1=1231491&r2=1234111&rev=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
Fri Jan 20 20:27:59 2012
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.server.txn;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
@@ -32,6 +28,10 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStore.StoreFuture;
+
+import java.util.Collection;
+import java.util.List;
/**
* An implementation of ServerTransaction where each enqueue/dequeue
@@ -40,15 +40,23 @@ import org.apache.qpid.server.store.Mess
* Since there is no long-lived transaction, the commit and rollback methods of
* this implementation are empty.
*/
-public class AutoCommitTransaction implements ServerTransaction
+public class AsyncAutoCommitTransaction implements ServerTransaction
{
- protected static final Logger _logger =
Logger.getLogger(AutoCommitTransaction.class);
+ protected static final Logger _logger =
Logger.getLogger(AsyncAutoCommitTransaction.class);
private final MessageStore _messageStore;
+ private final FutureRecorder _futureRecorder;
- public AutoCommitTransaction(MessageStore transactionLog)
+ public static interface FutureRecorder
+ {
+ public void recordFuture(StoreFuture future, Action action);
+
+ }
+
+ public AsyncAutoCommitTransaction(MessageStore transactionLog,
FutureRecorder recorder)
{
_messageStore = transactionLog;
+ _futureRecorder = recorder;
}
public long getTransactionStartTime()
@@ -62,7 +70,8 @@ public class AutoCommitTransaction imple
*/
public void addPostTransactionAction(final Action immediateAction)
{
- immediateAction.postCommit();
+ addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction);
+
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action
postTransactionAction)
@@ -70,6 +79,7 @@ public class AutoCommitTransaction imple
MessageStore.Transaction txn = null;
try
{
+ MessageStore.StoreFuture future;
if(message.isPersistent() && queue.isDurable())
{
if (_logger.isDebugEnabled())
@@ -79,10 +89,15 @@ public class AutoCommitTransaction imple
txn = _messageStore.newTransaction();
txn.dequeueMessage(queue, message);
- txn.commitTran();
+ future = txn.commitTranAsync();
+
txn = null;
}
- postTransactionAction.postCommit();
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
postTransactionAction = null;
}
catch (AMQException e)
@@ -97,6 +112,11 @@ public class AutoCommitTransaction imple
}
+ private void addFuture(final MessageStore.StoreFuture future, final Action
action)
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+
public void dequeue(Collection<QueueEntry> queueEntries, Action
postTransactionAction)
{
MessageStore.Transaction txn = null;
@@ -123,12 +143,17 @@ public class AutoCommitTransaction imple
}
}
+ MessageStore.StoreFuture future;
if(txn != null)
{
- txn.commitTran();
+ future = txn.commitTranAsync();
txn = null;
}
- postTransactionAction.postCommit();
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
postTransactionAction = null;
}
catch (AMQException e)
@@ -149,6 +174,7 @@ public class AutoCommitTransaction imple
MessageStore.Transaction txn = null;
try
{
+ MessageStore.StoreFuture future;
if(message.isPersistent() && queue.isDurable())
{
if (_logger.isDebugEnabled())
@@ -158,10 +184,14 @@ public class AutoCommitTransaction imple
txn = _messageStore.newTransaction();
txn.enqueueMessage(queue, message);
- txn.commitTran();
+ future = txn.commitTranAsync();
txn = null;
}
- postTransactionAction.postCommit();
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
postTransactionAction = null;
}
catch (AMQException e)
@@ -205,13 +235,17 @@ public class AutoCommitTransaction imple
}
}
+ MessageStore.StoreFuture future;
if (txn != null)
{
- txn.commitTran();
+ future = txn.commitTranAsync();
txn = null;
}
-
- postTransactionAction.postCommit();
+ else
+ {
+ future = MessageStore.IMMEDIATE_FUTURE;
+ }
+ addFuture(future, postTransactionAction);
postTransactionAction = null;
@@ -231,7 +265,17 @@ public class AutoCommitTransaction imple
public void commit(final Runnable immediatePostTransactionAction)
{
- immediatePostTransactionAction.run();
+ addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+ {
+ public void postCommit()
+ {
+ immediatePostTransactionAction.run();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
}
public void commit()
@@ -242,6 +286,11 @@ public class AutoCommitTransaction imple
{
}
+ public boolean isTransactional()
+ {
+ return false;
+ }
+
private void rollbackIfNecessary(Action postTransactionAction,
MessageStore.Transaction txn)
{
if (txn != null)
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
Fri Jan 20 20:27:59 2012
@@ -242,6 +242,11 @@ public class AutoCommitTransaction imple
{
}
+ public boolean isTransactional()
+ {
+ return false;
+ }
+
private void rollbackIfNecessary(Action postTransactionAction,
MessageStore.Transaction txn)
{
if (txn != null)
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
Fri Jan 20 20:27:59 2012
@@ -309,7 +309,12 @@ public class LocalTransaction implements
private void resetDetails()
{
_transaction = null;
- _postTransactionActions.clear();
+ _postTransactionActions.clear();
_txnStartTime = 0L;
}
+
+ public boolean isTransactional()
+ {
+ return true;
+ }
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
Fri Jan 20 20:27:59 2012
@@ -43,6 +43,8 @@ import java.util.List;
*/
public interface ServerTransaction
{
+
+
/**
* Represents an action to be performed on transaction commit or rollback
*/
@@ -110,4 +112,6 @@ public interface ServerTransaction
* be executed immediately after the underlying transaction has
rolled-back.
*/
void rollback();
+
+ boolean isTransactional();
}
Modified:
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
(original)
+++
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
Fri Jan 20 20:27:59 2012
@@ -45,10 +45,15 @@ public class SessionDelegate
method.dispatch(ssn, this);
}
- public void command(Session ssn, Method method) {
+ public void command(Session ssn, Method method)
+ {
+ command(ssn, method, !method.hasPayload());
+ }
+ public void command(Session ssn, Method method, boolean processed)
+ {
ssn.identify(method);
method.dispatch(ssn, this);
- if (!method.hasPayload())
+ if (processed)
{
ssn.processed(method);
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
Fri Jan 20 20:27:59 2012
@@ -70,6 +70,11 @@ public class AcknowledgeTest extends Qpi
// These should all end up being prefetched by session
sendMessage(_consumerSession, _queue, 1);
+ if(!transacted)
+ {
+ ((AMQSession)_consumerSession).sync();
+ }
+
assertEquals("Wrong number of messages on queue", 1,
((AMQSession)
_consumerSession).getQueueDepth((AMQDestination) _queue));
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=1234111&r1=1234110&r2=1234111&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Fri Jan 20 20:27:59 2012
@@ -721,7 +721,7 @@ public class DurableSubscriptionTest ext
msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
msg.setBooleanProperty("Match", false);
producer.send(msg);
-
+ ((AMQSession)session).sync();
// should be 1 or 2 messages on queue now
// (1 for the java broker due to use of server side selectors, and 2
for the cpp broker due to client side selectors only)
AMQQueue queue = new AMQQueue("amq.topic", "clientid" + ":" +
"testResubscribeWithChangedSelectorNoClose");
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]