Repository: qpid-broker-j Updated Branches: refs/heads/7.0.x 179cf52d5 -> 8e874ce58
QPID-8091: [Broker-J] [AMQP 1.0] Add store transaction timeout feature (cherry picked from commit ffd5ad0d456532fb6c9b0ba4e28297c3452bf32c. Merge conflicts resolved manually.) Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/24df0ac1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/24df0ac1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/24df0ac1 Branch: refs/heads/7.0.x Commit: 24df0ac1f37d22a98d89303b95f44cd4d03be2d8 Parents: 179cf52 Author: Alex Rudyy <oru...@apache.org> Authored: Tue Feb 6 15:48:36 2018 +0000 Committer: Alex Rudyy <oru...@apache.org> Committed: Mon Feb 19 17:50:45 2018 +0000 ---------------------------------------------------------------------- .../apache/qpid/server/session/AMQPSession.java | 2 - .../server/session/AbstractAMQPSession.java | 125 ------------------- .../qpid/server/transport/AMQPConnection.java | 6 + .../transport/AbstractAMQPConnection.java | 76 +++++++++++ .../server/protocol/v0_10/ServerSession.java | 25 ++-- .../server/protocol/v0_10/Session_0_10.java | 6 - .../qpid/server/protocol/v0_8/AMQChannel.java | 28 +++-- .../protocol/v1_0/AMQPConnection_1_0Impl.java | 7 ++ .../qpid/server/protocol/v1_0/Session_1_0.java | 6 - .../TxnCoordinatorReceivingLinkEndpoint.java | 45 +++++-- 10 files changed, 155 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java index 1b224f1..d68592b 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java +++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java @@ -45,8 +45,6 @@ public interface AMQPSession<S extends org.apache.qpid.server.session.AMQPSessio LogSubject getLogSubject(); - void doTimeoutAction(String reason); - void block(Queue<?> queue); void unblock(Queue<?> queue); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java index b68ed7f..6661de1 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java +++ b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.session; import java.security.AccessControlContext; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -32,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.Subject; -import com.google.common.base.Supplier; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -51,25 +49,20 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.protocol.PublishAuthorisationCache; import org.apache.qpid.server.security.SecurityToken; import org.apache.qpid.server.transport.AMQPConnection; -import org.apache.qpid.server.transport.TransactionTimeoutTicker; import org.apache.qpid.server.transport.network.Ticker; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, X extends ConsumerTarget<X>> extends AbstractConfiguredObject<S> implements AMQPSession<S, X>, EventLoggerProvider { - private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out"; - private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out"; private final Action _deleteModelTask; private final AMQPConnection<?> _connection; private final int _sessionId; @@ -144,13 +137,6 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, } @Override - protected void postResolveChildren() - { - super.postResolveChildren(); - registerTransactionTimeoutTickers(_connection); - } - - @Override public int getChannelId() { return _sessionId; @@ -201,117 +187,6 @@ public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>, return _connection.getEventLogger(); } - private void registerTransactionTimeoutTickers(Connection<?> amqpConnection) - { - NamedAddressSpace addressSpace = amqpConnection.getAddressSpace(); - if (addressSpace instanceof QueueManagingVirtualHost) - { - final EventLogger eventLogger = getEventLogger(); - final QueueManagingVirtualHost<?> virtualhost = (QueueManagingVirtualHost<?>) addressSpace; - final List<Ticker> tickers = new ArrayList<>(4); - - final Supplier<Long> transactionStartTimeSupplier = new Supplier<Long>() - { - @Override - public Long get() - { - return getTransactionStartTimeLong(); - } - }; - final Supplier<Long> transactionUpdateTimeSupplier = new Supplier<Long>() - { - @Override - public Long get() - { - return getTransactionUpdateTimeLong(); - } - }; - - long notificationRepeatPeriod = - getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD); - - if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0) - { - tickers.add(new TransactionTimeoutTicker( - virtualhost.getStoreTransactionOpenTimeoutWarn(), - notificationRepeatPeriod, transactionStartTimeSupplier, - new Action<Long>() - { - @Override - public void performAction(Long age) - { - eventLogger.message(getLogSubject(), ChannelMessages.OPEN_TXN(age)); - } - } - )); - } - if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0) - { - tickers.add(new TransactionTimeoutTicker( - virtualhost.getStoreTransactionOpenTimeoutClose(), - notificationRepeatPeriod, transactionStartTimeSupplier, - new Action<Long>() - { - @Override - public void performAction(Long age) - { - doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR); - } - } - )); - } - if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0) - { - tickers.add(new TransactionTimeoutTicker( - virtualhost.getStoreTransactionIdleTimeoutWarn(), - notificationRepeatPeriod, transactionUpdateTimeSupplier, - new Action<Long>() - { - @Override - public void performAction(Long age) - { - eventLogger.message(getLogSubject(), ChannelMessages.IDLE_TXN(age)); - } - } - )); - } - if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0) - { - tickers.add(new TransactionTimeoutTicker( - virtualhost.getStoreTransactionIdleTimeoutClose(), - notificationRepeatPeriod, transactionUpdateTimeSupplier, - new Action<Long>() - { - @Override - public void performAction(Long age) - { - doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR); - } - } - )); - } - - for (Ticker ticker : tickers) - { - addTicker(ticker); - } - - Action deleteTickerTask = new Action() - { - @Override - public void performAction(Object o) - { - removeDeleteTask(this); - for (Ticker ticker : tickers) - { - removeTicker(ticker); - } - } - }; - addDeleteTask(deleteTickerTask); - } - } - @Override public void addTicker(final Ticker ticker) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java index 6da15b1..3b609f9 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java +++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.session.AMQPSession; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.Deletable; public interface AMQPConnection<C extends AMQPConnection<C>> @@ -104,6 +105,11 @@ public interface AMQPConnection<C extends AMQPConnection<C>> Iterator<ServerTransaction> getOpenTransactions(); + void registerTransactionTickers(ServerTransaction serverTransaction, + final Action<String> closeAction, final long notificationRepeatPeriod); + + void unregisterTransactionTickers(ServerTransaction serverTransaction); + enum CloseReason { MANAGEMENT, http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java index 059db39..944d3bc 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java +++ b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java @@ -31,8 +31,11 @@ import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -51,6 +54,7 @@ import org.apache.qpid.server.connection.ConnectionPrincipal; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.EventLoggerProvider; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.logging.subjects.ConnectionLogSubject; import org.apache.qpid.server.model.AbstractConfiguredObject; @@ -78,6 +82,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.TransactionObserver; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.FixedKeyMapCreator; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>, T> extends AbstractConfiguredObject<C> @@ -85,6 +90,8 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C, { public static final FixedKeyMapCreator PUBLISH_ACTION_MAP_CREATOR = new FixedKeyMapCreator("routingKey", "immediate"); + private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out"; + private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out"; private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAMQPConnection.class); private final Broker<?> _broker; @@ -135,6 +142,8 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C, private volatile TransactionObserver _transactionObserver; private long _maxUncommittedInMemorySize; + private final Map<ServerTransaction, Set<Ticker>> _transactionTickers = new ConcurrentHashMap<>(); + public AbstractAMQPConnection(Broker<?> broker, ServerNetworkConnection network, AmqpPort<?> port, @@ -890,6 +899,73 @@ public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C, _transactionObserver); } + @Override + public void registerTransactionTickers(final ServerTransaction serverTransaction, + final Action<String> closeAction, final long notificationRepeatPeriod) + { + NamedAddressSpace addressSpace = getAddressSpace(); + if (addressSpace instanceof QueueManagingVirtualHost) + { + final QueueManagingVirtualHost<?> virtualhost = (QueueManagingVirtualHost<?>) addressSpace; + + EventLogger eventLogger = virtualhost.getEventLogger(); + + final Set<Ticker> tickers = new LinkedHashSet<>(4); + + if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0) + { + tickers.add(new TransactionTimeoutTicker( + virtualhost.getStoreTransactionOpenTimeoutWarn(), + notificationRepeatPeriod, serverTransaction::getTransactionStartTime, + age -> eventLogger.message(getLogSubject(), ChannelMessages.OPEN_TXN(age)) + )); + } + if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0) + { + tickers.add(new TransactionTimeoutTicker( + virtualhost.getStoreTransactionOpenTimeoutClose(), + notificationRepeatPeriod, serverTransaction::getTransactionStartTime, + age -> closeAction.performAction(OPEN_TRANSACTION_TIMEOUT_ERROR))); + } + if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0) + { + tickers.add(new TransactionTimeoutTicker( + virtualhost.getStoreTransactionIdleTimeoutWarn(), + notificationRepeatPeriod, serverTransaction::getTransactionUpdateTime, + age -> eventLogger.message(getLogSubject(), ChannelMessages.IDLE_TXN(age)) + )); + } + if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0) + { + tickers.add(new TransactionTimeoutTicker( + virtualhost.getStoreTransactionIdleTimeoutClose(), + notificationRepeatPeriod, serverTransaction::getTransactionUpdateTime, + age -> closeAction.performAction(IDLE_TRANSACTION_TIMEOUT_ERROR) + )); + } + + if (!tickers.isEmpty()) + { + for (Ticker ticker : tickers) + { + getAggregateTicker().addTicker(ticker); + } + notifyWork(); + } + _transactionTickers.put(serverTransaction, tickers); + } + } + + @Override + public void unregisterTransactionTickers(final ServerTransaction serverTransaction) + { + NamedAddressSpace addressSpace = getAddressSpace(); + if (addressSpace instanceof QueueManagingVirtualHost) + { + _transactionTickers.remove(serverTransaction).forEach(t -> getAggregateTicker().removeTicker(t)); + } + } + private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener { private final long _allowedTime; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index e2211dd..015394e 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -68,6 +68,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.txn.AsyncCommand; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -1147,6 +1148,8 @@ public class ServerSession extends SessionInvoker } amqpConnection.decrementTransactionOpenCounter(); _transaction.rollback(); + + amqpConnection.unregisterTransactionTickers(_transaction); } else if(_transaction instanceof DistributedTransaction) { @@ -1240,7 +1243,21 @@ public class ServerSession extends SessionInvoker public void selectTx() { - _transaction = getConnection().getAmqpConnection().createLocalTransaction(); + ServerTransaction txn = _transaction; + AMQPConnection_0_10 amqpConnection = getAMQPConnection(); + if (txn instanceof LocalTransaction) + { + amqpConnection.unregisterTransactionTickers(_transaction); + } + + _transaction = amqpConnection.createLocalTransaction(); + long notificationRepeatPeriod = + getModelObject().getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD); + amqpConnection.registerTransactionTickers(_transaction, + message -> amqpConnection.closeSessionAsync(_modelObject, + AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, + (String) message), + notificationRepeatPeriod); } public void selectDtx() @@ -1671,12 +1688,6 @@ public class ServerSession extends SessionInvoker } } - public void doTimeoutAction(final String reason) - { - getAMQPConnection().closeSessionAsync(_modelObject, - AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason); - } - private class ResultFuture<T> implements Future<T> { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java index 62e98fa..dd6ca80 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java @@ -113,12 +113,6 @@ public class Session_0_10 extends AbstractAMQPSession<Session_0_10, ConsumerTarg } @Override - public void doTimeoutAction(final String idleTransactionTimeoutError) - { - _serverSession.doTimeoutAction(idleTransactionTimeoutError); - } - - @Override public long getTransactionUpdateTimeLong() { return _serverSession.getTransactionUpdateTimeLong(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index e106a3f..ffe2c8d 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -234,12 +234,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 } - @Override - public void doTimeoutAction(String reason) - { - _connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason); - } - private void message(final LogMessage message) { getEventLogger().message(message); @@ -287,12 +281,6 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 return getDeliveryMethod.hasDeliveredMessage(); } - /** Sets this channel to be part of a local transaction */ - private void setLocalTransactional() - { - _transaction = _connection.createLocalTransaction(); - } - boolean isTransactional() { return _transaction.isTransactional(); @@ -762,6 +750,8 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 _connection.incrementTransactionRollbackCounter(); } _connection.decrementTransactionOpenCounter(); + + _connection.unregisterTransactionTickers(_transaction); } _transaction.rollback(); @@ -3282,7 +3272,19 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0 LOGGER.debug("RECV[" + _channelId + "] TxSelect"); } - setLocalTransactional(); + ServerTransaction txn = _transaction; + if (txn instanceof LocalTransaction) + { + getConnection().unregisterTransactionTickers(_transaction); + } + + _transaction = _connection.createLocalTransaction(); + long notificationRepeatPeriod = getContextValue(Long.class, + TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD); + getConnection().registerTransactionTickers(_transaction, + message -> _connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, + message), + notificationRepeatPeriod); MethodRegistry methodRegistry = _connection.getMethodRegistry(); TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index 9091456..e440187 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import com.google.common.base.Supplier; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import com.google.common.collect.Sets; @@ -58,12 +59,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ConnectionPropertyEnricher; @@ -124,12 +128,15 @@ import org.apache.qpid.server.transport.AggregateTicker; import org.apache.qpid.server.transport.ByteBufferSender; import org.apache.qpid.server.transport.ProtocolEngine; import org.apache.qpid.server.transport.ServerNetworkConnection; +import org.apache.qpid.server.transport.TransactionTimeoutTicker; +import org.apache.qpid.server.transport.network.Ticker; import org.apache.qpid.server.transport.util.Functions; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 62045a6..aa7ea29 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -1203,12 +1203,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget } @Override - public void doTimeoutAction(final String reason) - { - getAMQPConnection().closeSessionAsync(this, AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason); - } - - @Override public String toString() { return "Session_1_0[" + _connection + ": " + _sendingChannel + ']'; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java index cf0ddf3..1394d3a 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java @@ -21,11 +21,12 @@ package org.apache.qpid.server.protocol.v1_0; import java.util.Arrays; import java.util.Collections; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.model.Session; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; @@ -50,7 +51,7 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator> { - private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>(); + private final Map<Integer, ServerTransaction> _createdTransactions = new ConcurrentHashMap<>(); public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Coordinator> link) { @@ -87,12 +88,18 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn Session_1_0 session = getSession(); - session.getConnection().receivedComplete(); + AMQPConnection_1_0<?> connection = session.getConnection(); + connection.receivedComplete(); if (command instanceof Declare) { - final IdentifiedTransaction txn = session.getConnection().createIdentifiedTransaction(); + final IdentifiedTransaction txn = connection.createIdentifiedTransaction(); _createdTransactions.put(txn.getId(), txn.getServerTransaction()); + long notificationRepeatPeriod = + getSession().getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD); + connection.registerTransactionTickers(txn.getServerTransaction(), + this::doTimeoutAction, + notificationRepeatPeriod); Declared state = new Declared(); @@ -188,6 +195,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn error.setDescription("The transaction was marked as rollback only due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)"); } _createdTransactions.remove(transactionId); + connection.unregisterTransactionTickers(txn); connection.removeTransaction(transactionId); connection.decrementTransactionOpenCounter(); } @@ -204,14 +212,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn protected void remoteDetachedPerformDetach(Detach detach) { // force rollback of open transactions - for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet()) - { - entry.getValue().rollback(); - AMQPConnection_1_0<?> connection = getSession().getConnection(); - connection.decrementTransactionOpenCounter(); - connection.incrementTransactionRollbackCounter(); - connection.removeTransaction(entry.getKey()); - } + rollbackOpenTransactions(); close(); } @@ -266,4 +267,24 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn { } + + private void doTimeoutAction(final String message) + { + rollbackOpenTransactions(); + Error error = new Error(TransactionError.TRANSACTION_TIMEOUT, message); + getSession().getConnection().close(error); + } + + private void rollbackOpenTransactions() + { + for(Map.Entry<Integer, ServerTransaction> entry : _createdTransactions.entrySet()) + { + entry.getValue().rollback(); + AMQPConnection_1_0<?> connection = getSession().getConnection(); + connection.decrementTransactionOpenCounter(); + connection.incrementTransactionRollbackCounter(); + connection.removeTransaction(entry.getKey()); + } + _createdTransactions.clear(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org