QPID-8091: [Broker-J] Add protocol tests for transaction timeout feature

(cherry picked from commit c531ca0ac28e5fd457b4b114674867b3bd2ee093. Merge 
conflicts are resolved manually)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f649224b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f649224b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f649224b

Branch: refs/heads/7.0.x
Commit: f649224bff05bbd65d7256d932c136d99159b411
Parents: 5b8587a
Author: Alex Rudyy <oru...@apache.org>
Authored: Wed Feb 7 23:48:05 2018 +0000
Committer: Alex Rudyy <oru...@apache.org>
Committed: Mon Feb 19 18:14:03 2018 +0000

----------------------------------------------------------------------
 .../transaction/TransactionalTransferTest.java  | 118 +++++++++++++++++++
 .../apache/qpid/tests/utils/BrokerAdmin.java    |   4 +
 .../utils/EmbeddedBrokerPerClassAdminImpl.java  |  14 +++
 .../utils/ExternalQpidBrokerAdminImpl.java      |  12 ++
 .../apache/qpid/tests/utils/QpidTestRunner.java |  18 ++-
 5 files changed, 161 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index fb61974..a893d76 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.tests.protocol.v1_0.transaction;
 
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -32,6 +33,7 @@ import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
 
+import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -62,6 +64,7 @@ import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class TransactionalTransferTest extends BrokerAdminUsingTestBase
 {
@@ -644,6 +647,121 @@ public class TransactionalTransferTest extends 
BrokerAdminUsingTestBase
         }
     }
 
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    public void transactionalPostingTimeout() throws Exception
+    {
+        int transactionTimeout = 1000;
+            getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", 
transactionTimeout);
+
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = 
interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Disposition responseDisposition = interaction.negotiateProtocol()
+                                                         .consumeResponse()
+                                                         .open()
+                                                         
.consumeResponse(Open.class)
+                                                         .begin()
+                                                         
.consumeResponse(Begin.class)
+
+                                                         
.txnAttachCoordinatorLink(txnState)
+                                                         .txnDeclare(txnState)
+
+                                                         
.attachRole(Role.SENDER)
+                                                         
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                         
.attachHandle(linkHandle)
+                                                         
.attach().consumeResponse(Attach.class)
+                                                         
.consumeResponse(Flow.class)
+
+                                                         
.transferHandle(linkHandle)
+                                                         
.transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                         
.transferTransactionalState(txnState.getCurrentTransactionId())
+                                                         .transfer()
+                                                         
.consumeResponse(Disposition.class)
+                                                         
.getLatestResponse(Disposition.class);
+
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), 
is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) 
responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+            Thread.sleep(transactionTimeout + 1000);
+
+            Close responseClose = 
interaction.consumeResponse().getLatestResponse(Close.class);
+            assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+            assertThat(responseClose.getError().getCondition(), 
equalTo(TransactionError.TRANSACTION_TIMEOUT));
+        }
+    }
+
+    @Test
+    @BrokerSpecific(kind = KIND_BROKER_J)
+    public void transactionalRetirementTimeout() throws Exception
+    {
+        int transactionTimeout = 1000;
+        getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", 
transactionTimeout);
+
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, 
TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = 
interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       .attach()
+                       .consumeResponse(Attach.class)
+
+                       .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.MAX_VALUE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       
.dispositionTransactionalState(txnState.getCurrentTransactionId(), new 
Accepted())
+                       .disposition()
+                       .sync();
+
+            Thread.sleep(transactionTimeout + 1000);
+            Response<?> response = interaction.consumeResponse(Close.class, 
Flow.class).getLatestResponse();
+            Close responseClose;
+            if (response.getBody() instanceof Close)
+            {
+                responseClose = (Close) response.getBody();
+            }
+            else
+            {
+                responseClose = 
interaction.consumeResponse().getLatestResponse(Close.class);
+            }
+            assertThat(responseClose.getError(), is(Matchers.notNullValue()));
+            assertThat(responseClose.getError().getCondition(), 
equalTo(TransactionError.TRANSACTION_TIMEOUT));
+        }
+    }
+
+
     private void assertUnknownTransactionIdError(final Response<?> response)
     {
         assertThat(response, is(notNullValue()));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
----------------------------------------------------------------------
diff --git 
a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
 
b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
index 3b57431..476e2b1 100644
--- 
a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
+++ 
b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.plugin.Pluggable;
 
 public interface BrokerAdmin extends Pluggable
 {
+    String KIND_BROKER_J = "broker-j";
     String TEST_QUEUE_NAME = "testQueue";
     Long RESTART_TIMEOUT = Long.getLong("brokerAdmin.restart_timeout", 10000);
 
@@ -56,6 +57,9 @@ public interface BrokerAdmin extends Pluggable
     String getValidUsername();
     String getValidPassword();
 
+    String getKind();
+
+    void configure(String settingName, Object settingValue);
 
 
     enum PortType

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git 
a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
 
b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index 04e08a6..293df5b 100644
--- 
a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ 
b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -24,8 +24,10 @@ import java.io.File;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
+import java.security.PrivilegedAction;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -382,6 +384,18 @@ public class EmbeddedBrokerPerClassAdminImpl implements 
BrokerAdmin
     }
 
     @Override
+    public String getKind()
+    {
+        return KIND_BROKER_J;
+    }
+
+    @Override
+    public void configure(final String settingName, final Object settingValue)
+    {
+        
_currentVirtualHostNode.getVirtualHost().setAttributes(Collections.singletonMap(settingName,
 settingValue));
+    }
+
+    @Override
     public String getType()
     {
         return "EMBEDDED_BROKER_PER_CLASS";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
----------------------------------------------------------------------
diff --git 
a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
 
b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
index f359053..5f24546 100644
--- 
a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
+++ 
b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java
@@ -156,6 +156,18 @@ public class ExternalQpidBrokerAdminImpl implements 
BrokerAdmin
     }
 
     @Override
+    public String getKind()
+    {
+        return KIND_BROKER_J;
+    }
+
+    @Override
+    public void configure(final String settingName, final Object settingValue)
+    {
+        throw new UnsupportedOperationException("External Qpid Broker does not 
support configuring");
+    }
+
+    @Override
     public String getType()
     {
         return "EXTERNAL_BROKER";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
----------------------------------------------------------------------
diff --git 
a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
 
b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
index 02cec34..d3c9be6 100644
--- 
a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
+++ 
b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java
@@ -63,14 +63,22 @@ public class QpidTestRunner extends BlockJUnit4ClassRunner
     @Override
     protected void runChild(final FrameworkMethod method, final RunNotifier 
notifier)
     {
-        _brokerAdmin.beforeTestMethod(_testClass, method.getMethod());
-        try
+        BrokerSpecific brokerSpecific = 
method.getAnnotation(BrokerSpecific.class);
+        if (brokerSpecific != null && 
!brokerSpecific.kind().equalsIgnoreCase(_brokerAdmin.getKind()))
         {
-            super.runChild(method, notifier);
+            notifier.fireTestIgnored(describeChild(method));
         }
-        finally
+        else
         {
-            _brokerAdmin.afterTestMethod(_testClass, method.getMethod());
+            _brokerAdmin.beforeTestMethod(_testClass, method.getMethod());
+            try
+            {
+                super.runChild(method, notifier);
+            }
+            finally
+            {
+                _brokerAdmin.afterTestMethod(_testClass, method.getMethod());
+            }
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to