QPID-8091: [Broker-J] Add protocol tests for transaction timeout feature (cherry picked from commit c531ca0ac28e5fd457b4b114674867b3bd2ee093. Merge conflicts are resolved manually)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f649224b Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f649224b Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f649224b Branch: refs/heads/7.0.x Commit: f649224bff05bbd65d7256d932c136d99159b411 Parents: 5b8587a Author: Alex Rudyy <oru...@apache.org> Authored: Wed Feb 7 23:48:05 2018 +0000 Committer: Alex Rudyy <oru...@apache.org> Committed: Mon Feb 19 18:14:03 2018 +0000 ---------------------------------------------------------------------- .../transaction/TransactionalTransferTest.java | 118 +++++++++++++++++++ .../apache/qpid/tests/utils/BrokerAdmin.java | 4 + .../utils/EmbeddedBrokerPerClassAdminImpl.java | 14 +++ .../utils/ExternalQpidBrokerAdminImpl.java | 12 ++ .../apache/qpid/tests/utils/QpidTestRunner.java | 18 ++- 5 files changed, 161 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java index fb61974..a893d76 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.tests.protocol.v1_0.transaction; +import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; @@ -32,6 +33,7 @@ import java.net.InetSocketAddress; import java.util.Collections; import java.util.List; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -62,6 +64,7 @@ import org.apache.qpid.tests.protocol.v1_0.Utils; import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; +import org.apache.qpid.tests.utils.BrokerSpecific; public class TransactionalTransferTest extends BrokerAdminUsingTestBase { @@ -644,6 +647,121 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase } } + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + public void transactionalPostingTimeout() throws Exception + { + int transactionTimeout = 1000; + getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout); + + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final UnsignedInteger linkHandle = UnsignedInteger.ONE; + + final Interaction interaction = transport.newInteraction(); + final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); + Disposition responseDisposition = interaction.negotiateProtocol() + .consumeResponse() + .open() + .consumeResponse(Open.class) + .begin() + .consumeResponse(Begin.class) + + .txnAttachCoordinatorLink(txnState) + .txnDeclare(txnState) + + .attachRole(Role.SENDER) + .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachHandle(linkHandle) + .attach().consumeResponse(Attach.class) + .consumeResponse(Flow.class) + + .transferHandle(linkHandle) + .transferPayloadData(TEST_MESSAGE_CONTENT) + .transferTransactionalState(txnState.getCurrentTransactionId()) + .transfer() + .consumeResponse(Disposition.class) + .getLatestResponse(Disposition.class); + + assertThat(responseDisposition.getRole(), is(Role.RECEIVER)); + assertThat(responseDisposition.getSettled(), is(Boolean.TRUE)); + assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class))); + assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class))); + + Thread.sleep(transactionTimeout + 1000); + + Close responseClose = interaction.consumeResponse().getLatestResponse(Close.class); + assertThat(responseClose.getError(), is(Matchers.notNullValue())); + assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT)); + } + } + + @Test + @BrokerSpecific(kind = KIND_BROKER_J) + public void transactionalRetirementTimeout() throws Exception + { + int transactionTimeout = 1000; + getBrokerAdmin().configure("storeTransactionOpenTimeoutClose", transactionTimeout); + + getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT); + try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) + { + final Interaction interaction = transport.newInteraction(); + final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); + interaction.negotiateProtocol() + .consumeResponse() + .open() + .consumeResponse(Open.class) + .begin() + .consumeResponse(Begin.class) + + .txnAttachCoordinatorLink(txnState) + .txnDeclare(txnState) + + .attachRole(Role.RECEIVER) + .attachHandle(UnsignedInteger.ONE) + .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME) + .attachRcvSettleMode(ReceiverSettleMode.FIRST) + .attach() + .consumeResponse(Attach.class) + + .flowIncomingWindow(UnsignedInteger.MAX_VALUE) + .flowNextIncomingId(UnsignedInteger.ZERO) + .flowOutgoingWindow(UnsignedInteger.ZERO) + .flowNextOutgoingId(UnsignedInteger.ZERO) + .flowLinkCredit(UnsignedInteger.MAX_VALUE) + .flowHandleFromLinkHandle() + .flow() + + .receiveDelivery() + .decodeLatestDelivery(); + + Object data = interaction.getDecodedLatestDelivery(); + assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT))); + + interaction.dispositionSettled(true) + .dispositionRole(Role.RECEIVER) + .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) + .disposition() + .sync(); + + Thread.sleep(transactionTimeout + 1000); + Response<?> response = interaction.consumeResponse(Close.class, Flow.class).getLatestResponse(); + Close responseClose; + if (response.getBody() instanceof Close) + { + responseClose = (Close) response.getBody(); + } + else + { + responseClose = interaction.consumeResponse().getLatestResponse(Close.class); + } + assertThat(responseClose.getError(), is(Matchers.notNullValue())); + assertThat(responseClose.getError().getCondition(), equalTo(TransactionError.TRANSACTION_TIMEOUT)); + } + } + + private void assertUnknownTransactionIdError(final Response<?> response) { assertThat(response, is(notNullValue())); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java ---------------------------------------------------------------------- diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java index 3b57431..476e2b1 100644 --- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java +++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/BrokerAdmin.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.plugin.Pluggable; public interface BrokerAdmin extends Pluggable { + String KIND_BROKER_J = "broker-j"; String TEST_QUEUE_NAME = "testQueue"; Long RESTART_TIMEOUT = Long.getLong("brokerAdmin.restart_timeout", 10000); @@ -56,6 +57,9 @@ public interface BrokerAdmin extends Pluggable String getValidUsername(); String getValidPassword(); + String getKind(); + + void configure(String settingName, Object settingValue); enum PortType http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java ---------------------------------------------------------------------- diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java index 04e08a6..293df5b 100644 --- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java +++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java @@ -24,8 +24,10 @@ import java.io.File; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.nio.file.Files; +import java.security.PrivilegedAction; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -382,6 +384,18 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin } @Override + public String getKind() + { + return KIND_BROKER_J; + } + + @Override + public void configure(final String settingName, final Object settingValue) + { + _currentVirtualHostNode.getVirtualHost().setAttributes(Collections.singletonMap(settingName, settingValue)); + } + + @Override public String getType() { return "EMBEDDED_BROKER_PER_CLASS"; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java ---------------------------------------------------------------------- diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java index f359053..5f24546 100644 --- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java +++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/ExternalQpidBrokerAdminImpl.java @@ -156,6 +156,18 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin } @Override + public String getKind() + { + return KIND_BROKER_J; + } + + @Override + public void configure(final String settingName, final Object settingValue) + { + throw new UnsupportedOperationException("External Qpid Broker does not support configuring"); + } + + @Override public String getType() { return "EXTERNAL_BROKER"; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f649224b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java ---------------------------------------------------------------------- diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java index 02cec34..d3c9be6 100644 --- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java +++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/QpidTestRunner.java @@ -63,14 +63,22 @@ public class QpidTestRunner extends BlockJUnit4ClassRunner @Override protected void runChild(final FrameworkMethod method, final RunNotifier notifier) { - _brokerAdmin.beforeTestMethod(_testClass, method.getMethod()); - try + BrokerSpecific brokerSpecific = method.getAnnotation(BrokerSpecific.class); + if (brokerSpecific != null && !brokerSpecific.kind().equalsIgnoreCase(_brokerAdmin.getKind())) { - super.runChild(method, notifier); + notifier.fireTestIgnored(describeChild(method)); } - finally + else { - _brokerAdmin.afterTestMethod(_testClass, method.getMethod()); + _brokerAdmin.beforeTestMethod(_testClass, method.getMethod()); + try + { + super.runChild(method, notifier); + } + finally + { + _brokerAdmin.afterTestMethod(_testClass, method.getMethod()); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org