Repository: qpid-broker-j Updated Branches: refs/heads/master 4e6a665df -> 20a23dc68
QPID-7639: [AMQP 1.0] Implement large transaction guard honouring "connection.maxUncommittedInMemorySize" and flowing messages to disk on breaching threashold Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/20a23dc6 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/20a23dc6 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/20a23dc6 Branch: refs/heads/master Commit: 20a23dc68714c3635028378e10e12d598c5cc1b5 Parents: 4e6a665 Author: Alex Rudyy <[email protected]> Authored: Tue Apr 25 16:46:56 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Tue Apr 25 17:09:10 2017 +0100 ---------------------------------------------------------------------- .../server/session/AbstractAMQPSession.java | 7 +- .../qpid/server/txn/LocalTransaction.java | 101 ++++++++++++++++++- .../txn/FlowToDiskMessageObserverTest.java | 87 ++++++++++++++++ .../qpid/server/txn/LocalTransactionTest.java | 20 ++++ .../server/protocol/v0_10/ServerSession.java | 54 +--------- .../server/protocol/v0_10/Session_0_10.java | 5 - .../qpid/server/protocol/v0_8/AMQChannel.java | 59 ++--------- .../protocol/v1_0/AMQPConnection_1_0.java | 2 +- .../protocol/v1_0/AMQPConnection_1_0Impl.java | 9 +- .../TxnCoordinatorReceivingLinkEndpoint.java | 9 +- 10 files changed, 234 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java index 2410ef9..5f61e7d 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java +++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java @@ -81,7 +81,7 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, protected final SecurityToken _token; protected final PublishAuthorisationCache _publishAuthCache; - protected final long _maxUncommittedInMemorySize; + private final long _maxUncommittedInMemorySize; protected final LogSubject _logSubject; @@ -411,6 +411,11 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, } } + public long getMaxUncommittedInMemorySize() + { + return _maxUncommittedInMemorySize; + } + protected abstract void updateBlockedStateIfNecessary(); public abstract boolean isClosing(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java index 9c96083..1d4e7b5 100755 --- a/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java +++ b/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java @@ -29,11 +29,16 @@ import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.MessageEnqueueRecord; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.Transaction; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -48,7 +53,8 @@ public class LocalTransaction implements ServerTransaction { protected static final Logger _logger = LoggerFactory.getLogger(LocalTransaction.class); - private final List<Action> _postTransactionActions = new ArrayList<Action>(); + private final List<Action> _postTransactionActions = new ArrayList<>(); + private final MessageObserver _messageObserver; private volatile Transaction _transaction; private final ActivityTimeAccessor _activityTime; @@ -60,6 +66,11 @@ public class LocalTransaction implements ServerTransaction public LocalTransaction(MessageStore transactionLog) { + this(transactionLog, MessageObserver.NOOP_MESSAGE_OBSERVER); + } + + public LocalTransaction(MessageStore transactionLog, MessageObserver messageObserver) + { this(transactionLog, new ActivityTimeAccessor() { @Override @@ -67,13 +78,16 @@ public class LocalTransaction implements ServerTransaction { return System.currentTimeMillis(); } - }); + }, messageObserver); } - public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime) + public LocalTransaction(MessageStore transactionLog, + ActivityTimeAccessor activityTime, + MessageObserver messageObserver) { _transactionLog = transactionLog; _activityTime = activityTime; + _messageObserver = messageObserver; } @Override @@ -262,6 +276,7 @@ public class LocalTransaction implements ServerTransaction }); } } + _messageObserver.onMessageEnqueue(message); } public void enqueue(Collection<? extends BaseQueue> queues, EnqueueableMessage message, EnqueueAction postTransactionAction) @@ -332,6 +347,7 @@ public class LocalTransaction implements ServerTransaction } tidyUpOnError(e); } + _messageObserver.onMessageEnqueue(message); } public void commit() @@ -513,6 +529,7 @@ public class LocalTransaction implements ServerTransaction private void resetDetails() { + _messageObserver.reset(); _asyncTran = null; _transaction = null; _postTransactionActions.clear(); @@ -540,4 +557,82 @@ public class LocalTransaction implements ServerTransaction { return _isRollbackOnly; } + + + public interface MessageObserver + { + MessageObserver NOOP_MESSAGE_OBSERVER = new NoopMessageObserver(); + + void onMessageEnqueue(EnqueueableMessage<? extends StorableMessageMetaData> message); + + void reset(); + } + + private static class NoopMessageObserver implements MessageObserver + { + @Override + public void onMessageEnqueue(final EnqueueableMessage<? extends StorableMessageMetaData> message) + { + // noop + } + + @Override + public void reset() + { + // noop + } + } + + public static class FlowToDiskMessageObserver implements MessageObserver + { + private volatile long _uncommittedMessageSize; + private final List<StoredMessage<? extends StorableMessageMetaData>> _uncommittedMessages = new ArrayList<>(); + private final LogSubject _logSubject; + private final EventLogger _eventLogger; + private final long _maxUncommittedInMemorySize; + + public FlowToDiskMessageObserver(final long maxUncommittedInMemorySize, + final LogSubject logSubject, + final EventLogger eventLogger) + { + _logSubject = logSubject; + _eventLogger = eventLogger; + _maxUncommittedInMemorySize = maxUncommittedInMemorySize; + } + + @Override + public void onMessageEnqueue(final EnqueueableMessage<? extends StorableMessageMetaData> message) + { + StoredMessage<? extends StorableMessageMetaData> handle = message.getStoredMessage(); + _uncommittedMessageSize += handle.getContentSize(); + if (_uncommittedMessageSize > _maxUncommittedInMemorySize) + { + handle.flowToDisk(); + if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize()) + { + _eventLogger.message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); + } + + if(!_uncommittedMessages.isEmpty()) + { + for (StoredMessage<? extends StorableMessageMetaData> uncommittedHandle : _uncommittedMessages) + { + uncommittedHandle.flowToDisk(); + } + _uncommittedMessages.clear(); + } + } + else + { + _uncommittedMessages.add(handle); + } + } + + @Override + public void reset() + { + _uncommittedMessageSize = 0L; + _uncommittedMessages.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java new file mode 100644 index 0000000..b402e9c --- /dev/null +++ b/broker-core/src/test/java/org/apache/qpid/server/txn/FlowToDiskMessageObserverTest.java @@ -0,0 +1,87 @@ +package org.apache.qpid.server.txn; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.LogMessage; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.test.utils.QpidTestCase; + +public class FlowToDiskMessageObserverTest extends QpidTestCase +{ + private static final int MAX_UNCOMMITTED_IN_MEMORY_SIZE = 100; + private LocalTransaction.FlowToDiskMessageObserver _flowToDiskMessageObserver; + private EventLogger _eventLogger ; + private LogSubject _logSubject; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _eventLogger = mock(EventLogger.class); + _logSubject = mock(LogSubject.class); + _flowToDiskMessageObserver = new LocalTransaction.FlowToDiskMessageObserver(MAX_UNCOMMITTED_IN_MEMORY_SIZE, + _logSubject, + _eventLogger); + } + + public void testOnMessageEnqueue() throws Exception + { + EnqueueableMessage<?> message1 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE); + EnqueueableMessage<?> message2 = createMessage(1); + EnqueueableMessage<?> message3 = createMessage(1); + + _flowToDiskMessageObserver.onMessageEnqueue(message1); + + StoredMessage handle1 = message1.getStoredMessage(); + verify(handle1, never()).flowToDisk(); + verify(_eventLogger, never()).message(same(_logSubject), any(LogMessage.class)); + + _flowToDiskMessageObserver.onMessageEnqueue(message2); + + StoredMessage handle2 = message2.getStoredMessage(); + verify(handle1).flowToDisk(); + verify(handle2).flowToDisk(); + verify(_eventLogger).message(same(_logSubject), any(LogMessage.class)); + + _flowToDiskMessageObserver.onMessageEnqueue(message3); + + StoredMessage handle3 = message2.getStoredMessage(); + verify(handle1).flowToDisk(); + verify(handle2).flowToDisk(); + verify(handle3).flowToDisk(); + verify(_eventLogger).message(same(_logSubject), any(LogMessage.class)); + } + + public void testReset() throws Exception + { + EnqueueableMessage<?> message1 = createMessage(MAX_UNCOMMITTED_IN_MEMORY_SIZE); + EnqueueableMessage<?> message2 = createMessage(1); + + _flowToDiskMessageObserver.onMessageEnqueue(message1); + _flowToDiskMessageObserver.reset(); + _flowToDiskMessageObserver.onMessageEnqueue(message2); + + StoredMessage handle1 = message1.getStoredMessage(); + StoredMessage handle2 = message2.getStoredMessage(); + verify(handle1, never()).flowToDisk(); + verify(handle2, never()).flowToDisk(); + verify(_eventLogger, never()).message(same(_logSubject), any(LogMessage.class)); + } + + private EnqueueableMessage<?> createMessage(int size) + { + EnqueueableMessage message = mock(EnqueueableMessage.class); + StoredMessage handle = mock(StoredMessage.class); + when(message.getStoredMessage()).thenReturn(handle); + when(handle.getContentSize()).thenReturn(size); + return message; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java b/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java index 4cc20a6..b4db5c1 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.txn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; @@ -602,6 +604,24 @@ public class LocalTransactionTest extends QpidTestCase assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime()); } + public void testEnqueueInvokesMessageObserver() throws Exception + { + + final LocalTransaction.MessageObserver messageObserver = mock(LocalTransaction.MessageObserver.class); + _transaction = new LocalTransaction(_transactionLog, messageObserver); + + _message = createTestMessage(true); + _queues = createTestBaseQueues(new boolean[] {false, true, false, true}); + + _transaction.enqueue(_queues, _message, _action1); + + verify(messageObserver).onMessageEnqueue(_message); + + _transaction.enqueue(createQueue(true), _message, _action1); + + verify(messageObserver, times(2)).onMessageEnqueue(_message); + } + private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags) { Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index afc69ec..23fcb13 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -86,7 +86,6 @@ import org.apache.qpid.server.protocol.v0_10.transport.*; import org.apache.qpid.server.protocol.v0_10.transport.Xid; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.protocol.v0_10.transport.Frame; import org.apache.qpid.server.txn.*; @@ -156,9 +155,6 @@ public class ServerSession extends SessionInvoker private final CopyOnWriteArrayList<Consumer<?, ConsumerTarget_0_10>> _consumers = new CopyOnWriteArrayList<>(); private AtomicReference<LogMessage> _forcedCloseLogMessage = new AtomicReference<LogMessage>(); - private volatile long _uncommittedMessageSize; - - private final List<StoredMessage<MessageMetaData_0_10>> _uncommittedMessages = new ArrayList<>(); public ServerSession(ServerConnection connection, ServerSessionDelegate delegate, Binary name, long expiry) { @@ -969,47 +965,9 @@ public class ServerSession extends SessionInvoker int enqueues = result.send(_transaction, null); getAMQPConnection().registerMessageReceived(message.getSize(), message.getArrivalTime()); incrementOutstandingTxnsIfNecessary(); - incrementUncommittedMessageSize(message.getStoredMessage()); return enqueues; } - private void resetUncommittedMessages() - { - _uncommittedMessageSize = 0l; - _uncommittedMessages.clear(); - } - - - private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData_0_10> handle) - { - if (isTransactional() && !(_transaction instanceof DistributedTransaction)) - { - _uncommittedMessageSize += handle.getContentSize(); - if (_uncommittedMessageSize > getMaxUncommittedInMemorySize()) - { - handle.flowToDisk(); - if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize()) - { - getAMQPConnection().getEventLogger() - .message(getLogSubject(), ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); - } - - if(!_uncommittedMessages.isEmpty()) - { - for (StoredMessage<MessageMetaData_0_10> uncommittedHandle : _uncommittedMessages) - { - uncommittedHandle.flowToDisk(); - } - _uncommittedMessages.clear(); - } - } - else - { - _uncommittedMessages.add(handle); - } - } - } - public void sendMessage(MessageTransfer xfr, Runnable postIdSettingAction) { @@ -1270,7 +1228,10 @@ public class ServerSession extends SessionInvoker public void selectTx() { - _transaction = new LocalTransaction(this.getMessageStore()); + _transaction = new LocalTransaction(this.getMessageStore(), + new LocalTransaction.FlowToDiskMessageObserver(_modelObject.getMaxUncommittedInMemorySize(), + getLogSubject(), + getAMQPConnection().getEventLogger())); _txnStarts.incrementAndGet(); } @@ -1377,7 +1338,6 @@ public class ServerSession extends SessionInvoker _txnCommits.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); - resetUncommittedMessages(); } public void rollback() @@ -1387,7 +1347,6 @@ public class ServerSession extends SessionInvoker _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); - resetUncommittedMessages(); } @@ -1818,11 +1777,6 @@ public class ServerSession extends SessionInvoker AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason); } - public final long getMaxUncommittedInMemorySize() - { - return _modelObject.getMaxUncommittedInMemorySize(); - } - private class ResultFuture<T> implements Future<T> { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java index ae69352..649d290 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java @@ -196,9 +196,4 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg { return _serverSession; } - - long getMaxUncommittedInMemorySize() - { - return _maxUncommittedInMemorySize; - } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 84ae994..67b3406 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -183,8 +183,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 private long _blockingTimeout; private boolean _confirmOnPublish; private long _confirmedMessageCounter; - private volatile long _uncommittedMessageSize; - private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>(); private boolean _wireBlockingState; @@ -287,14 +285,12 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 /** Sets this channel to be part of a local transaction */ private void setLocalTransactional() { - _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor() - { - @Override - public long getActivityTime() - { - return _connection.getLastReadTime(); - } - }); + _transaction = new LocalTransaction(_messageStore, + () -> _connection.getLastReadTime(), + new LocalTransaction.FlowToDiskMessageObserver( + getMaxUncommittedInMemorySize(), + _logSubject, + getEventLogger())); _txnStarts.incrementAndGet(); } @@ -473,7 +469,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 .createBasicAckBody(_confirmedMessageCounter, false); _connection.writeFrame(responseBody.generateFrame(_channelId)); } - incrementUncommittedMessageSize(storedMessage); incrementOutstandingTxnsIfNecessary(); } @@ -503,35 +498,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 } - private void incrementUncommittedMessageSize(final StoredMessage<MessageMetaData> handle) - { - if (isTransactional()) - { - _uncommittedMessageSize += handle.getContentSize(); - if (_uncommittedMessageSize > getMaxUncommittedInMemorySize()) - { - handle.flowToDisk(); - if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize()) - { - messageWithSubject(ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize)); - } - - if(!_uncommittedMessages.isEmpty()) - { - for (StoredMessage<MessageMetaData> uncommittedHandle : _uncommittedMessages) - { - uncommittedHandle.flowToDisk(); - } - _uncommittedMessages.clear(); - } - } - else - { - _uncommittedMessages.add(handle); - } - } - } - /** * Either throws a {@link AMQConnectionException} or returns the message * @@ -1193,13 +1159,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); } - resetUncommittedMessages(); - } - - private void resetUncommittedMessages() - { - _uncommittedMessageSize = 0l; - _uncommittedMessages.clear(); } private void rollback(Runnable postRollbackTask) @@ -1221,7 +1180,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 _txnRejects.incrementAndGet(); _txnStarts.incrementAndGet(); decrementOutstandingTxnsIfNecessary(); - resetUncommittedMessages(); } postRollbackTask.run(); @@ -1324,11 +1282,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 return _currentMessage != null; } - private long getMaxUncommittedInMemorySize() - { - return _maxUncommittedInMemorySize; - } - public boolean isChannelFlow() { return _channelFlow; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java index c75bd2f..b2ab892 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java @@ -74,7 +74,7 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ void close(Error error); Iterator<IdentifiedTransaction> getOpenTransactions(); - IdentifiedTransaction createLocalTransaction(); + IdentifiedTransaction createLocalTransaction(final Session_1_0 transactionSession); ServerTransaction getTransaction(int txnId); void removeTransaction(int txnId); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index 7be265d..7fa95d0 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -1837,7 +1837,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } @Override - public IdentifiedTransaction createLocalTransaction() + public IdentifiedTransaction createLocalTransaction(final Session_1_0 transactionSession) { ServerTransaction[] openTransactions = _openTransactions; final int maxOpenTransactions = openTransactions.length; @@ -1862,7 +1862,12 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } final LocalTransaction serverTransaction = - new LocalTransaction(getAddressSpace().getMessageStore()); + new LocalTransaction(getAddressSpace().getMessageStore(), + new LocalTransaction.FlowToDiskMessageObserver( + transactionSession.getMaxUncommittedInMemorySize(), + transactionSession.getLogSubject(), + transactionSession.getEventLogger())); + _openTransactions[id] = serverTransaction; return new IdentifiedTransaction(id, serverTransaction); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20a23dc6/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java index 6b9ee81..a2f4d2d 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java @@ -119,16 +119,17 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn amqpValueSectionFound = true; Object command = section.getValue(); + Session_1_0 session = getSession(); if(command instanceof Declare) { - final IdentifiedTransaction txn = getSession().getConnection().createLocalTransaction(); + final IdentifiedTransaction txn = session.getConnection().createLocalTransaction(session); _createdTransactions.put(txn.getId(), txn.getServerTransaction()); Declared state = new Declared(); - getSession().incrementStartedTransactions(); + session.incrementStartedTransactions(); - state.setTxnId(getSession().integerToBinary(txn.getId())); + state.setTxnId(session.integerToBinary(txn.getId())); updateDisposition(deliveryTag, state, true); } @@ -136,7 +137,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn { Discharge discharge = (Discharge) command; - final Error error = discharge(getSession().binaryToInteger(discharge.getTxnId()), + final Error error = discharge(session.binaryToInteger(discharge.getTxnId()), Boolean.TRUE.equals(discharge.getFail())); updateDisposition(deliveryTag, error == null ? new Accepted() : null, true); return error; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
