QPID-8091: [Broker-J][AMQP 0-10] Close 0-10 connection on transaction timeout

(cherry picked from commit a9667120ed7a64264a50c80a2938a6c73c3f93f2. Merge 
conflict resolved manually)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/8ee099a9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/8ee099a9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/8ee099a9

Branch: refs/heads/7.0.x
Commit: 8ee099a987562e9a98074fbd6aa4698af6796e49
Parents: f84a341
Author: Alex Rudyy <oru...@apache.org>
Authored: Thu Feb 8 16:07:48 2018 +0000
Committer: Alex Rudyy <oru...@apache.org>
Committed: Mon Feb 19 23:19:57 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/AMQPConnection_0_10Impl.java | 11 +++++---
 .../server/protocol/v0_10/ServerConnection.java | 29 ++++++++++++++++++++
 .../server/protocol/v0_10/ServerSession.java    |  8 ++++--
 .../transacted/TransactionTimeoutTestCase.java  | 10 +++----
 4 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8ee099a9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index 3c9fbd1..055f935 100755
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -72,8 +72,6 @@ public class AMQPConnection_0_10Impl extends 
AbstractAMQPConnection<AMQPConnecti
 
     private final Set<AMQPSession<?,?>> _sessionsWithWork =
             Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, 
Boolean>());
-    private volatile String _closeCause;
-
 
     public AMQPConnection_0_10Impl(final Broker<?> broker,
                                    ServerNetworkConnection network,
@@ -302,7 +300,7 @@ public class AMQPConnection_0_10Impl extends 
AbstractAMQPConnection<AMQPConnecti
     @Override
     public void sendConnectionCloseAsync(final CloseReason reason, final 
String description)
     {
-        _closeCause = description;
+        _connection.setConnectionCloseCause(reason, description);
         stopConnection();
         // Best mapping for all reasons is "forced"
         
_connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, 
description);
@@ -365,7 +363,12 @@ public class AMQPConnection_0_10Impl extends 
AbstractAMQPConnection<AMQPConnecti
     @Override
     protected String getCloseCause()
     {
-        return _closeCause;
+        String connectionCloseMessage = 
_connection.getConnectionCloseMessage();
+        if (connectionCloseMessage == null)
+        {
+            return null;
+        }
+        return _connection.getConnectionCloseCode() + " - " + 
connectionCloseMessage;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8ee099a9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 8032b35..1166058 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -106,6 +106,8 @@ public class ServerConnection extends ConnectionInvoker
     private String locale;
     private SocketAddress _remoteAddress;
     private int _heartBeatDelay;
+    private volatile int _connectionCloseCode;
+    private volatile String _connectionCloseMessage;
 
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
@@ -874,4 +876,31 @@ public class ServerConnection extends ConnectionInvoker
         return _ignoreFutureInput;
     }
 
+    void setConnectionCloseCause(final AMQPConnection.CloseReason reason, 
final String description)
+    {
+        final int cause;
+        switch (reason)
+        {
+            case MANAGEMENT:
+                cause = ErrorCodes.CONNECTION_FORCED;
+                break;
+            case TRANSACTION_TIMEOUT:
+                cause = ErrorCodes.RESOURCE_ERROR;
+                break;
+            default:
+                cause = ErrorCodes.INTERNAL_ERROR;
+        }
+        _connectionCloseCode = cause;
+        _connectionCloseMessage = description;
+    }
+
+    int getConnectionCloseCode()
+    {
+        return _connectionCloseCode;
+    }
+
+    String getConnectionCloseMessage()
+    {
+        return _connectionCloseMessage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8ee099a9/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 015394e..32a2474 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1168,6 +1168,11 @@ public class ServerSession extends SessionInvoker
         }
 
         LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
+        if (operationalLoggingMessage == null && 
getConnection().getConnectionCloseMessage() != null)
+        {
+            operationalLoggingMessage = 
ChannelMessages.CLOSE_FORCED(getConnection().getConnectionCloseCode(),
+                                                                     
getConnection().getConnectionCloseMessage());
+        }
         if (operationalLoggingMessage == null)
         {
             operationalLoggingMessage = ChannelMessages.CLOSE();
@@ -1254,8 +1259,7 @@ public class ServerSession extends SessionInvoker
         long notificationRepeatPeriod =
                 getModelObject().getContextValue(Long.class, 
Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
         amqpConnection.registerTransactionTickers(_transaction,
-                                                  message -> 
amqpConnection.closeSessionAsync(_modelObject,
-                                                                               
               AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
+                                                  message -> 
amqpConnection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
                                                                                
               (String) message),
                                                   notificationRepeatPeriod);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/8ee099a9/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
----------------------------------------------------------------------
diff --git 
a/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
 
b/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
index 09b7ac7..6b4f8ba 100644
--- 
a/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
+++ 
b/systests/src/test/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
@@ -53,8 +53,8 @@ public abstract class TransactionTimeoutTestCase extends 
QpidBrokerTestCase impl
 {
     private static final int ALERT_MESSAGE_TOLERANCE = 6;
     public static final String TEXT = "0123456789abcdefghiforgettherest";
-    public static final String CHN_OPEN_TXN = "CHN-1007";
-    public static final String CHN_IDLE_TXN = "CHN-1008";
+    public static final String OPEN_TXN = "CON-1010";
+    public static final String IDLE_TXN = "CHN-1011";
     public static final String IDLE = "Idle";
     public static final String OPEN = "Open";
     
@@ -165,8 +165,8 @@ public abstract class TransactionTimeoutTestCase extends 
QpidBrokerTestCase impl
      */
     protected void monitor(int idle, int open) throws Exception
     {
-        List<String> idleMsgs = _monitor.findMatches(CHN_IDLE_TXN);
-        List<String> openMsgs = _monitor.findMatches(CHN_OPEN_TXN);
+        List<String> idleMsgs = _monitor.findMatches(IDLE_TXN);
+        List<String> openMsgs = _monitor.findMatches(OPEN_TXN);
         
         String idleErr = "Expected " + idle + " but found " + idleMsgs.size() 
+ " txn idle messages";
         String openErr = "Expected " + open + " but found " + openMsgs.size() 
+ " txn open messages";
@@ -218,8 +218,6 @@ public abstract class TransactionTimeoutTestCase extends 
QpidBrokerTestCase impl
         assertNotNull("Linked exception message should not be null", 
_linkedExceptionMessage);
         assertTrue("Linked exception message '" + _linkedExceptionMessage + "' 
should contain '" + reason + "'",
                    _linkedExceptionMessage.contains(reason + " transaction 
timed out"));
-        assertTrue("Linked exception should have an error code", 
_linkedExceptionCode != 0);
-        assertEquals("Linked exception error code should be 506", 
ErrorCodes.RESOURCE_ERROR, _linkedExceptionCode);
     }
 
     /** @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to