Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 31037b059 -> a9667120e


QPID-8091: [Broker-J][AMQP 0-10] Close 0-10 connection on transaction timeout


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a9667120
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a9667120
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a9667120

Branch: refs/heads/master
Commit: a9667120ed7a64264a50c80a2938a6c73c3f93f2
Parents: 53cf020
Author: Alex Rudyy <oru...@apache.org>
Authored: Thu Feb 8 16:07:48 2018 +0000
Committer: Alex Rudyy <oru...@apache.org>
Committed: Thu Feb 8 16:09:09 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/AMQPConnection_0_10Impl.java | 11 +++++---
 .../server/protocol/v0_10/ServerConnection.java | 29 ++++++++++++++++++++
 .../server/protocol/v0_10/ServerSession.java    |  8 ++++--
 .../tests/protocol/v0_10/TransactionTest.java   | 26 ++++++------------
 4 files changed, 50 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9667120/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index 3c9fbd1..055f935 100755
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -72,8 +72,6 @@ public class AMQPConnection_0_10Impl extends 
AbstractAMQPConnection<AMQPConnecti
 
     private final Set<AMQPSession<?,?>> _sessionsWithWork =
             Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?,?>, 
Boolean>());
-    private volatile String _closeCause;
-
 
     public AMQPConnection_0_10Impl(final Broker<?> broker,
                                    ServerNetworkConnection network,
@@ -302,7 +300,7 @@ public class AMQPConnection_0_10Impl extends 
AbstractAMQPConnection<AMQPConnecti
     @Override
     public void sendConnectionCloseAsync(final CloseReason reason, final 
String description)
     {
-        _closeCause = description;
+        _connection.setConnectionCloseCause(reason, description);
         stopConnection();
         // Best mapping for all reasons is "forced"
         
_connection.sendConnectionCloseAsync(ConnectionCloseCode.CONNECTION_FORCED, 
description);
@@ -365,7 +363,12 @@ public class AMQPConnection_0_10Impl extends 
AbstractAMQPConnection<AMQPConnecti
     @Override
     protected String getCloseCause()
     {
-        return _closeCause;
+        String connectionCloseMessage = 
_connection.getConnectionCloseMessage();
+        if (connectionCloseMessage == null)
+        {
+            return null;
+        }
+        return _connection.getConnectionCloseCode() + " - " + 
connectionCloseMessage;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9667120/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 8032b35..1166058 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -106,6 +106,8 @@ public class ServerConnection extends ConnectionInvoker
     private String locale;
     private SocketAddress _remoteAddress;
     private int _heartBeatDelay;
+    private volatile int _connectionCloseCode;
+    private volatile String _connectionCloseMessage;
 
     public ServerConnection(final long connectionId,
                             Broker<?> broker,
@@ -874,4 +876,31 @@ public class ServerConnection extends ConnectionInvoker
         return _ignoreFutureInput;
     }
 
+    void setConnectionCloseCause(final AMQPConnection.CloseReason reason, 
final String description)
+    {
+        final int cause;
+        switch (reason)
+        {
+            case MANAGEMENT:
+                cause = ErrorCodes.CONNECTION_FORCED;
+                break;
+            case TRANSACTION_TIMEOUT:
+                cause = ErrorCodes.RESOURCE_ERROR;
+                break;
+            default:
+                cause = ErrorCodes.INTERNAL_ERROR;
+        }
+        _connectionCloseCode = cause;
+        _connectionCloseMessage = description;
+    }
+
+    int getConnectionCloseCode()
+    {
+        return _connectionCloseCode;
+    }
+
+    String getConnectionCloseMessage()
+    {
+        return _connectionCloseMessage;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9667120/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 015394e..32a2474 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -1168,6 +1168,11 @@ public class ServerSession extends SessionInvoker
         }
 
         LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
+        if (operationalLoggingMessage == null && 
getConnection().getConnectionCloseMessage() != null)
+        {
+            operationalLoggingMessage = 
ChannelMessages.CLOSE_FORCED(getConnection().getConnectionCloseCode(),
+                                                                     
getConnection().getConnectionCloseMessage());
+        }
         if (operationalLoggingMessage == null)
         {
             operationalLoggingMessage = ChannelMessages.CLOSE();
@@ -1254,8 +1259,7 @@ public class ServerSession extends SessionInvoker
         long notificationRepeatPeriod =
                 getModelObject().getContextValue(Long.class, 
Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
         amqpConnection.registerTransactionTickers(_transaction,
-                                                  message -> 
amqpConnection.closeSessionAsync(_modelObject,
-                                                                               
               AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
+                                                  message -> 
amqpConnection.sendConnectionCloseAsync(AMQPConnection.CloseReason.TRANSACTION_TIMEOUT,
                                                                                
               (String) message),
                                                   notificationRepeatPeriod);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a9667120/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
index 7259e66..7c48e36 100644
--- 
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
+++ 
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
@@ -33,19 +33,15 @@ import java.net.InetSocketAddress;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
-import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionCloseCode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
 import org.apache.qpid.server.protocol.v0_10.transport.Range;
 import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
-import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
 import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
-import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
-import org.apache.qpid.server.protocol.v0_10.transport.SessionDetach;
-import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -142,12 +138,9 @@ public class TransactionTest extends 
BrokerAdminUsingTestBase
 
             Thread.sleep(transactionTimeout + 1000);
 
-            ExecutionException e = receiveResponse(interaction, 
ExecutionException.class);
-            assertThat(e.getErrorCode(), 
is(equalTo(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED)));
-            assertThat(e.getDescription(), containsString("transaction timed 
out"));
-
-            SessionDetach detach = receiveResponse(interaction, 
SessionDetach.class);
-            assertThat(detach.getName(), is(equalTo(sessionName)));
+            ConnectionClose close = receiveResponse(interaction, 
ConnectionClose.class);
+            assertThat(close.getReplyCode(), 
is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
+            assertThat(close.getReplyText(), containsString("transaction timed 
out"));
 
             
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), 
is(equalTo(0)));
         }
@@ -208,12 +201,9 @@ public class TransactionTest extends 
BrokerAdminUsingTestBase
 
             Thread.sleep(transactionTimeout + 1000);
 
-            ExecutionException e = receiveResponse(interaction, 
ExecutionException.class);
-            assertThat(e.getErrorCode(), 
is(equalTo(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED)));
-            assertThat(e.getDescription(), containsString("transaction timed 
out"));
-
-            SessionDetach detach = receiveResponse(interaction, 
SessionDetach.class);
-            assertThat(detach.getName(), is(equalTo(sessionName)));
+            ConnectionClose close = receiveResponse(interaction, 
ConnectionClose.class);
+            assertThat(close.getReplyCode(), 
is(equalTo(ConnectionCloseCode.CONNECTION_FORCED)));
+            assertThat(close.getReplyText(), containsString("transaction timed 
out"));
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to