Repository: qpid-broker-j
Updated Branches:
  refs/heads/7.0.x 179cf52d5 -> 8e874ce58


QPID-8091: [Broker-J] [AMQP 1.0] Add store transaction timeout feature

(cherry picked from commit ffd5ad0d456532fb6c9b0ba4e28297c3452bf32c. Merge 
conflicts resolved manually.)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/24df0ac1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/24df0ac1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/24df0ac1

Branch: refs/heads/7.0.x
Commit: 24df0ac1f37d22a98d89303b95f44cd4d03be2d8
Parents: 179cf52
Author: Alex Rudyy <oru...@apache.org>
Authored: Tue Feb 6 15:48:36 2018 +0000
Committer: Alex Rudyy <oru...@apache.org>
Committed: Mon Feb 19 17:50:45 2018 +0000

----------------------------------------------------------------------
 .../apache/qpid/server/session/AMQPSession.java |   2 -
 .../server/session/AbstractAMQPSession.java     | 125 -------------------
 .../qpid/server/transport/AMQPConnection.java   |   6 +
 .../transport/AbstractAMQPConnection.java       |  76 +++++++++++
 .../server/protocol/v0_10/ServerSession.java    |  25 ++--
 .../server/protocol/v0_10/Session_0_10.java     |   6 -
 .../qpid/server/protocol/v0_8/AMQChannel.java   |  28 +++--
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |   7 ++
 .../qpid/server/protocol/v1_0/Session_1_0.java  |   6 -
 .../TxnCoordinatorReceivingLinkEndpoint.java    |  45 +++++--
 10 files changed, 155 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java 
b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
index 1b224f1..d68592b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/session/AMQPSession.java
@@ -45,8 +45,6 @@ public interface AMQPSession<S extends 
org.apache.qpid.server.session.AMQPSessio
 
     LogSubject getLogSubject();
 
-    void doTimeoutAction(String reason);
-
     void block(Queue<?> queue);
 
     void unblock(Queue<?> queue);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
 
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
index b68ed7f..6661de1 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/session/AbstractAMQPSession.java
@@ -21,7 +21,6 @@
 package org.apache.qpid.server.session;
 
 import java.security.AccessControlContext;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -32,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.auth.Subject;
 
-import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -51,25 +49,20 @@ import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
 import org.apache.qpid.server.protocol.PublishAuthorisationCache;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.transport.AMQPConnection;
-import org.apache.qpid.server.transport.TransactionTimeoutTicker;
 import org.apache.qpid.server.transport.network.Ticker;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
                                           X extends ConsumerTarget<X>>
         extends AbstractConfiguredObject<S>
         implements AMQPSession<S, X>, EventLoggerProvider
 {
-    private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open 
transaction timed out";
-    private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle 
transaction timed out";
     private final Action _deleteModelTask;
     private final AMQPConnection<?> _connection;
     private final int _sessionId;
@@ -144,13 +137,6 @@ public abstract class AbstractAMQPSession<S extends 
AbstractAMQPSession<S, X>,
     }
 
     @Override
-    protected void postResolveChildren()
-    {
-        super.postResolveChildren();
-        registerTransactionTimeoutTickers(_connection);
-    }
-
-    @Override
     public int getChannelId()
     {
         return _sessionId;
@@ -201,117 +187,6 @@ public abstract class AbstractAMQPSession<S extends 
AbstractAMQPSession<S, X>,
         return _connection.getEventLogger();
     }
 
-    private void registerTransactionTimeoutTickers(Connection<?> 
amqpConnection)
-    {
-        NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();
-        if (addressSpace instanceof QueueManagingVirtualHost)
-        {
-            final EventLogger eventLogger = getEventLogger();
-            final QueueManagingVirtualHost<?> virtualhost = 
(QueueManagingVirtualHost<?>) addressSpace;
-            final List<Ticker> tickers = new ArrayList<>(4);
-
-            final Supplier<Long> transactionStartTimeSupplier = new 
Supplier<Long>()
-            {
-                @Override
-                public Long get()
-                {
-                    return getTransactionStartTimeLong();
-                }
-            };
-            final Supplier<Long> transactionUpdateTimeSupplier = new 
Supplier<Long>()
-            {
-                @Override
-                public Long get()
-                {
-                    return getTransactionUpdateTimeLong();
-                }
-            };
-
-            long notificationRepeatPeriod =
-                    getContextValue(Long.class, 
Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
-
-            if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0)
-            {
-                tickers.add(new TransactionTimeoutTicker(
-                        virtualhost.getStoreTransactionOpenTimeoutWarn(),
-                        notificationRepeatPeriod, transactionStartTimeSupplier,
-                        new Action<Long>()
-                        {
-                            @Override
-                            public void performAction(Long age)
-                            {
-                                eventLogger.message(getLogSubject(), 
ChannelMessages.OPEN_TXN(age));
-                            }
-                        }
-                ));
-            }
-            if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0)
-            {
-                tickers.add(new TransactionTimeoutTicker(
-                        virtualhost.getStoreTransactionOpenTimeoutClose(),
-                        notificationRepeatPeriod, transactionStartTimeSupplier,
-                        new Action<Long>()
-                        {
-                            @Override
-                            public void performAction(Long age)
-                            {
-                                
doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
-                            }
-                        }
-                ));
-            }
-            if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0)
-            {
-                tickers.add(new TransactionTimeoutTicker(
-                        virtualhost.getStoreTransactionIdleTimeoutWarn(),
-                        notificationRepeatPeriod, 
transactionUpdateTimeSupplier,
-                        new Action<Long>()
-                        {
-                            @Override
-                            public void performAction(Long age)
-                            {
-                                eventLogger.message(getLogSubject(), 
ChannelMessages.IDLE_TXN(age));
-                            }
-                        }
-                ));
-            }
-            if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0)
-            {
-                tickers.add(new TransactionTimeoutTicker(
-                        virtualhost.getStoreTransactionIdleTimeoutClose(),
-                        notificationRepeatPeriod, 
transactionUpdateTimeSupplier,
-                        new Action<Long>()
-                        {
-                            @Override
-                            public void performAction(Long age)
-                            {
-                                
doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
-                            }
-                        }
-                ));
-            }
-
-            for (Ticker ticker : tickers)
-            {
-                addTicker(ticker);
-            }
-
-            Action deleteTickerTask = new Action()
-            {
-                @Override
-                public void performAction(Object o)
-                {
-                    removeDeleteTask(this);
-                    for (Ticker ticker : tickers)
-                    {
-                        removeTicker(ticker);
-                    }
-                }
-            };
-            addDeleteTask(deleteTickerTask);
-        }
-    }
-
     @Override
     public void addTicker(final Ticker ticker)
     {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
 
b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
index 6da15b1..3b609f9 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/transport/AMQPConnection.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.Deletable;
 
 public interface AMQPConnection<C extends AMQPConnection<C>>
@@ -104,6 +105,11 @@ public interface AMQPConnection<C extends 
AMQPConnection<C>>
 
     Iterator<ServerTransaction> getOpenTransactions();
 
+    void registerTransactionTickers(ServerTransaction serverTransaction,
+                                    final Action<String> closeAction, final 
long notificationRepeatPeriod);
+
+    void unregisterTransactionTickers(ServerTransaction serverTransaction);
+
     enum CloseReason
     {
         MANAGEMENT,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 
b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
index 059db39..944d3bc 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
@@ -31,8 +31,11 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -51,6 +54,7 @@ import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.EventLoggerProvider;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -78,6 +82,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.TransactionObserver;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.FixedKeyMapCreator;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public abstract class AbstractAMQPConnection<C extends 
AbstractAMQPConnection<C,T>, T>
         extends AbstractConfiguredObject<C>
@@ -85,6 +90,8 @@ public abstract class AbstractAMQPConnection<C extends 
AbstractAMQPConnection<C,
 
 {
     public static final FixedKeyMapCreator PUBLISH_ACTION_MAP_CREATOR = new 
FixedKeyMapCreator("routingKey", "immediate");
+    private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open 
transaction timed out";
+    private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle 
transaction timed out";
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractAMQPConnection.class);
 
     private final Broker<?> _broker;
@@ -135,6 +142,8 @@ public abstract class AbstractAMQPConnection<C extends 
AbstractAMQPConnection<C,
     private volatile TransactionObserver _transactionObserver;
     private long _maxUncommittedInMemorySize;
 
+    private final Map<ServerTransaction, Set<Ticker>> _transactionTickers = 
new ConcurrentHashMap<>();
+
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
                                   AmqpPort<?> port,
@@ -890,6 +899,73 @@ public abstract class AbstractAMQPConnection<C extends 
AbstractAMQPConnection<C,
                                     _transactionObserver);
     }
 
+    @Override
+    public void registerTransactionTickers(final ServerTransaction 
serverTransaction,
+                                           final Action<String> closeAction, 
final long notificationRepeatPeriod)
+    {
+        NamedAddressSpace addressSpace = getAddressSpace();
+        if (addressSpace instanceof QueueManagingVirtualHost)
+        {
+            final QueueManagingVirtualHost<?> virtualhost = 
(QueueManagingVirtualHost<?>) addressSpace;
+
+            EventLogger eventLogger = virtualhost.getEventLogger();
+
+            final Set<Ticker> tickers = new LinkedHashSet<>(4);
+
+            if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionOpenTimeoutWarn(),
+                        notificationRepeatPeriod, 
serverTransaction::getTransactionStartTime,
+                        age -> eventLogger.message(getLogSubject(), 
ChannelMessages.OPEN_TXN(age))
+                ));
+            }
+            if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionOpenTimeoutClose(),
+                        notificationRepeatPeriod, 
serverTransaction::getTransactionStartTime,
+                        age -> 
closeAction.performAction(OPEN_TRANSACTION_TIMEOUT_ERROR)));
+            }
+            if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionIdleTimeoutWarn(),
+                        notificationRepeatPeriod, 
serverTransaction::getTransactionUpdateTime,
+                        age -> eventLogger.message(getLogSubject(), 
ChannelMessages.IDLE_TXN(age))
+                ));
+            }
+            if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0)
+            {
+                tickers.add(new TransactionTimeoutTicker(
+                        virtualhost.getStoreTransactionIdleTimeoutClose(),
+                        notificationRepeatPeriod, 
serverTransaction::getTransactionUpdateTime,
+                        age -> 
closeAction.performAction(IDLE_TRANSACTION_TIMEOUT_ERROR)
+                ));
+            }
+
+            if (!tickers.isEmpty())
+            {
+                for (Ticker ticker : tickers)
+                {
+                    getAggregateTicker().addTicker(ticker);
+                }
+                notifyWork();
+            }
+            _transactionTickers.put(serverTransaction, tickers);
+        }
+    }
+
+    @Override
+    public void unregisterTransactionTickers(final ServerTransaction 
serverTransaction)
+    {
+        NamedAddressSpace addressSpace = getAddressSpace();
+        if (addressSpace instanceof QueueManagingVirtualHost)
+        {
+            _transactionTickers.remove(serverTransaction).forEach(t -> 
getAggregateTicker().removeTicker(t));
+        }
+    }
+
     private class SlowConnectionOpenTicker implements Ticker, 
SchedulingDelayNotificationListener
     {
         private final long _allowedTime;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index e2211dd..015394e 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -68,6 +68,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.txn.AsyncCommand;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
@@ -1147,6 +1148,8 @@ public class ServerSession extends SessionInvoker
             }
             amqpConnection.decrementTransactionOpenCounter();
             _transaction.rollback();
+
+            amqpConnection.unregisterTransactionTickers(_transaction);
         }
         else if(_transaction instanceof DistributedTransaction)
         {
@@ -1240,7 +1243,21 @@ public class ServerSession extends SessionInvoker
 
     public void selectTx()
     {
-        _transaction = 
getConnection().getAmqpConnection().createLocalTransaction();
+        ServerTransaction txn = _transaction;
+        AMQPConnection_0_10 amqpConnection = getAMQPConnection();
+        if (txn instanceof LocalTransaction)
+        {
+            amqpConnection.unregisterTransactionTickers(_transaction);
+        }
+
+        _transaction = amqpConnection.createLocalTransaction();
+        long notificationRepeatPeriod =
+                getModelObject().getContextValue(Long.class, 
Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
+        amqpConnection.registerTransactionTickers(_transaction,
+                                                  message -> 
amqpConnection.closeSessionAsync(_modelObject,
+                                                                               
               AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
+                                                                               
               (String) message),
+                                                  notificationRepeatPeriod);
     }
 
     public void selectDtx()
@@ -1671,12 +1688,6 @@ public class ServerSession extends SessionInvoker
         }
     }
 
-    public void doTimeoutAction(final String reason)
-    {
-        getAMQPConnection().closeSessionAsync(_modelObject,
-                                              
AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
-    }
-
     private class ResultFuture<T> implements Future<T>
     {
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
index 62e98fa..dd6ca80 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Session_0_10.java
@@ -113,12 +113,6 @@ public class Session_0_10 extends 
AbstractAMQPSession<Session_0_10, ConsumerTarg
     }
 
     @Override
-    public void doTimeoutAction(final String idleTransactionTimeoutError)
-    {
-        _serverSession.doTimeoutAction(idleTransactionTimeoutError);
-    }
-
-    @Override
     public long getTransactionUpdateTimeLong()
     {
         return _serverSession.getTransactionUpdateTimeLong();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index e106a3f..ffe2c8d 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -234,12 +234,6 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
 
     }
 
-    @Override
-    public void doTimeoutAction(String reason)
-    {
-        
_connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
 reason);
-    }
-
     private void message(final LogMessage message)
     {
         getEventLogger().message(message);
@@ -287,12 +281,6 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
         return getDeliveryMethod.hasDeliveredMessage();
     }
 
-    /** Sets this channel to be part of a local transaction */
-    private void setLocalTransactional()
-    {
-        _transaction = _connection.createLocalTransaction();
-    }
-
     boolean isTransactional()
     {
         return _transaction.isTransactional();
@@ -762,6 +750,8 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
                     _connection.incrementTransactionRollbackCounter();
                 }
                 _connection.decrementTransactionOpenCounter();
+
+                _connection.unregisterTransactionTickers(_transaction);
             }
 
             _transaction.rollback();
@@ -3282,7 +3272,19 @@ public class AMQChannel extends 
AbstractAMQPSession<AMQChannel, ConsumerTarget_0
             LOGGER.debug("RECV[" + _channelId + "] TxSelect");
         }
 
-        setLocalTransactional();
+        ServerTransaction txn = _transaction;
+        if (txn instanceof LocalTransaction)
+        {
+            getConnection().unregisterTransactionTickers(_transaction);
+        }
+
+        _transaction = _connection.createLocalTransaction();
+        long notificationRepeatPeriod = getContextValue(Long.class,
+                                                 
TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
+        getConnection().registerTransactionTickers(_transaction,
+                                                   message -> 
_connection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
+                                                                               
                    message),
+                                                   notificationRepeatPeriod);
 
         MethodRegistry methodRegistry = _connection.getMethodRegistry();
         TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 9091456..e440187 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import com.google.common.base.Supplier;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import com.google.common.collect.Sets;
@@ -58,12 +59,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.AuthenticationProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ConnectionPropertyEnricher;
@@ -124,12 +128,15 @@ import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.transport.TransactionTimeoutTicker;
+import org.apache.qpid.server.transport.network.Ticker;
 import org.apache.qpid.server.transport.util.Functions;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnection_1_0Impl, ConnectionHandler>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 62045a6..aa7ea29 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1203,12 +1203,6 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
     }
 
     @Override
-    public void doTimeoutAction(final String reason)
-    {
-        getAMQPConnection().closeSessionAsync(this, 
AMQPConnection.CloseReason.TRANSACTION_TIMEOUT, reason);
-    }
-
-    @Override
     public String toString()
     {
         return "Session_1_0[" + _connection + ": " + _sendingChannel + ']';

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/24df0ac1/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index cf0ddf3..1394d3a 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -21,11 +21,12 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -50,7 +51,7 @@ import 
org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint<Coordinator>
 {
-    private final LinkedHashMap<Integer, ServerTransaction> 
_createdTransactions = new LinkedHashMap<>();
+    private final Map<Integer, ServerTransaction> _createdTransactions = new 
ConcurrentHashMap<>();
 
     public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, 
final Link_1_0<Source, Coordinator> link)
     {
@@ -87,12 +88,18 @@ public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEn
 
                         Session_1_0 session = getSession();
 
-                        session.getConnection().receivedComplete();
+                        AMQPConnection_1_0<?> connection = 
session.getConnection();
+                        connection.receivedComplete();
 
                         if (command instanceof Declare)
                         {
-                            final IdentifiedTransaction txn = 
session.getConnection().createIdentifiedTransaction();
+                            final IdentifiedTransaction txn = 
connection.createIdentifiedTransaction();
                             _createdTransactions.put(txn.getId(), 
txn.getServerTransaction());
+                            long notificationRepeatPeriod =
+                                    getSession().getContextValue(Long.class, 
Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
+                            
connection.registerTransactionTickers(txn.getServerTransaction(),
+                                                                  
this::doTimeoutAction,
+                                                                  
notificationRepeatPeriod);
 
                             Declared state = new Declared();
 
@@ -188,6 +195,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEn
                 error.setDescription("The transaction was marked as rollback 
only due to an earlier issue (e.g. a published message was sent settled but 
could not be enqueued)");
             }
             _createdTransactions.remove(transactionId);
+            connection.unregisterTransactionTickers(txn);
             connection.removeTransaction(transactionId);
             connection.decrementTransactionOpenCounter();
         }
@@ -204,14 +212,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEn
     protected void remoteDetachedPerformDetach(Detach detach)
     {
         // force rollback of open transactions
-        for(Map.Entry<Integer, ServerTransaction> entry : 
_createdTransactions.entrySet())
-        {
-            entry.getValue().rollback();
-            AMQPConnection_1_0<?> connection = getSession().getConnection();
-            connection.decrementTransactionOpenCounter();
-            connection.incrementTransactionRollbackCounter();
-            connection.removeTransaction(entry.getKey());
-        }
+        rollbackOpenTransactions();
         close();
     }
 
@@ -266,4 +267,24 @@ public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEn
     {
 
     }
+
+    private void doTimeoutAction(final String message)
+    {
+        rollbackOpenTransactions();
+        Error error = new Error(TransactionError.TRANSACTION_TIMEOUT, message);
+        getSession().getConnection().close(error);
+    }
+
+    private void rollbackOpenTransactions()
+    {
+        for(Map.Entry<Integer, ServerTransaction> entry : 
_createdTransactions.entrySet())
+        {
+            entry.getValue().rollback();
+            AMQPConnection_1_0<?> connection = getSession().getConnection();
+            connection.decrementTransactionOpenCounter();
+            connection.incrementTransactionRollbackCounter();
+            connection.removeTransaction(entry.getKey());
+        }
+        _createdTransactions.clear();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to