This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit a01721df23ee30e3c194afffab549ec55638dcd8 Author: Alex Rudyy <[email protected]> AuthorDate: Tue Aug 20 16:05:54 2019 +0100 QPID-8349: [Tests][AMQP 1.0] Simplify transaction tests by moving InteractionTransactionalState from tests into Interaction --- .../qpid/tests/protocol/v1_0/Interaction.java | 75 ++++++++---- .../anonymousterminus/AnonymousTerminusTest.java | 69 +++++------ .../extensions/qpid/queue/QueueDeletionTest.java | 28 ++--- .../transactiontimeout/TransactionTimeoutTest.java | 15 +-- .../protocol/v1_0/messaging/TransferTest.java | 19 ++-- .../protocol/v1_0/transaction/DischargeTest.java | 63 +++++------ .../transaction/TransactionalTransferTest.java | 126 +++++++++------------ 7 files changed, 191 insertions(+), 204 deletions(-) diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java index ab2c959..727b501 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java @@ -112,6 +112,7 @@ public class Interaction extends AbstractInteraction<Interaction> private Map<Class, FrameBody> _latestResponses = new HashMap<>(); private AtomicLong _receivedDeliveryCount = new AtomicLong(); private AtomicLong _coordinatorCredits = new AtomicLong(); + private InteractionTransactionalState _transactionalState; Interaction(final FrameTransport frameTransport) { @@ -811,6 +812,11 @@ public class Interaction extends AbstractInteraction<Interaction> return transferState(transactionalState); } + public Interaction transferTransactionalStateFromCurrentTransaction() + { + return transferTransactionalState(getCurrentTransactionId()); + } + public Interaction transferResume(final Boolean resume) { _transfer.setResume(resume); @@ -901,6 +907,11 @@ public class Interaction extends AbstractInteraction<Interaction> return dispositionState(state); } + public Interaction dispositionTransactionalStateFromCurrentTransaction(final Outcome outcome) + { + return dispositionTransactionalState(getCurrentTransactionId(), outcome); + } + public Interaction dispositionRole(final Role role) { _disposition.setRole(role); @@ -948,23 +959,40 @@ public class Interaction extends AbstractInteraction<Interaction> // transaction // //////////////// - public Interaction txnAttachCoordinatorLink(InteractionTransactionalState transactionalState) throws Exception + + public UnsignedInteger getCoordinatorHandle() + { + return _transactionalState == null ? null : _transactionalState.getHandle(); + } + + public Binary getCurrentTransactionId() + { + return _transactionalState == null ? null : _transactionalState.getCurrentTransactionId(); + } + + public DeliveryState getCoordinatorLatestDeliveryState() { - return txnAttachCoordinatorLink(transactionalState, Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL); + return _transactionalState == null ? null : _transactionalState.getDeliveryState(); } - public Interaction txnAttachCoordinatorLink(final InteractionTransactionalState transactionalState, + public Interaction txnAttachCoordinatorLink(final UnsignedInteger handle) throws Exception + { + return txnAttachCoordinatorLink(handle, Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL); + } + + public Interaction txnAttachCoordinatorLink(final UnsignedInteger handle, final Symbol... outcomes) throws Exception { Attach attach = new Attach(); - attach.setName("testTransactionCoordinator-" + transactionalState.getHandle()); - attach.setHandle(transactionalState.getHandle()); + attach.setName("testTransactionCoordinator-" + handle); + attach.setHandle(handle); attach.setInitialDeliveryCount(UnsignedInteger.ZERO); attach.setTarget(new Coordinator()); attach.setRole(Role.SENDER); Source source = new Source(); attach.setSource(source); source.setOutcomes(outcomes); + _transactionalState = new InteractionTransactionalState(handle); sendPerformativeAndChainFuture(attach, _sessionChannel); consumeResponse(Attach.class); final Flow flow = consumeResponse(Flow.class).getLatestResponse(Flow.class); @@ -972,32 +1000,42 @@ public class Interaction extends AbstractInteraction<Interaction> return this; } - public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception + public Interaction txnDeclare() throws Exception { - sendPayloadToCoordinator(new Declare(), txnState.getHandle()); + sendPayloadToCoordinator(new Declare(), _transactionalState.getHandle()); final DeliveryState state = handleCoordinatorResponse(); - txnState.setDeliveryState(state); + _transactionalState.setDeliveryState(state); final Binary transactionId = ((Declared) state).getTxnId(); - txnState.setLastTransactionId(transactionId); + _transactionalState.setLastTransactionId(transactionId); return this; } - public Interaction txnSendDischarge(final InteractionTransactionalState txnState, final boolean failed) + public Interaction txnSendDischarge(final boolean failed) + throws Exception + { + return txnSendDischarge(_transactionalState.getCurrentTransactionId(), failed); + } + + public Interaction txnSendDischarge(Binary transactionId, final boolean failed) throws Exception { final Discharge discharge = new Discharge(); - discharge.setTxnId(txnState.getCurrentTransactionId()); + discharge.setTxnId(transactionId); discharge.setFail(failed); - sendPayloadToCoordinator(discharge, txnState.getHandle()); + sendPayloadToCoordinator(discharge, _transactionalState.getHandle()); return this; } - public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception + public Interaction txnDischarge(boolean failed) throws Exception { - txnSendDischarge(txnState, failed); + return txnDischarge(_transactionalState.getCurrentTransactionId(), failed); + } + + public Interaction txnDischarge(Binary transactionId, boolean failed) throws Exception + { + txnSendDischarge(transactionId, failed); final DeliveryState state = handleCoordinatorResponse(); - txnState.setDeliveryState(state); - txnState.setLastTransactionId(null); + _transactionalState.setDeliveryState(state); return this; } @@ -1186,11 +1224,6 @@ public class Interaction extends AbstractInteraction<Interaction> return transfers; } - public InteractionTransactionalState createTransactionalState(final UnsignedInteger handle) - { - return new InteractionTransactionalState(handle); - } - /////////// // Empty // /////////// diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java index a7ef076..582e7de 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java @@ -61,7 +61,6 @@ import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState; import org.apache.qpid.tests.protocol.v1_0.MessageEncoder; import org.apache.qpid.tests.protocol.v1_0.Utils; import org.apache.qpid.tests.utils.BrokerAdmin; @@ -252,12 +251,11 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase { final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); final UnsignedInteger linkHandle = UnsignedInteger.ONE; - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachHandle(linkHandle) @@ -268,13 +266,11 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME)) .transferDeliveryTag(_deliveryTag) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferSettled(Boolean.TRUE) - .transfer() - - .txnDischarge(txnState, false); + .transfer().txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); @@ -288,12 +284,11 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase { final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); final UnsignedInteger linkHandle = UnsignedInteger.ONE; - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachHandle(linkHandle) @@ -304,7 +299,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination(BrokerAdmin.TEST_QUEUE_NAME)) .transferDeliveryTag(_deliveryTag) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferSettled(Boolean.FALSE) .transfer(); @@ -318,9 +313,9 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase final TransactionalState receivedTxnState = (TransactionalState) dispositionState; assertThat(receivedTxnState.getOutcome(), is(instanceOf(Accepted.class))); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT))); @@ -334,12 +329,11 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase { final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); final UnsignedInteger linkHandle = UnsignedInteger.ONE; - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL) @@ -351,7 +345,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryTag(_deliveryTag) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferSettled(Boolean.FALSE) .transfer(); @@ -370,9 +364,9 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase assertThat(rejectedError.getInfo(), is(notNullValue())); assertThat(rejectedError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); } } @@ -383,12 +377,11 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase { final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); final UnsignedInteger linkHandle = UnsignedInteger.ONE; - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL) @@ -401,7 +394,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryId(UnsignedInteger.valueOf(1)) .transferDeliveryTag(_deliveryTag) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferSettled(Boolean.FALSE) .transfer(); @@ -412,9 +405,9 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase assertThat(senderLinkDetachError.getInfo(), is(notNullValue())); assertThat(senderLinkDetachError.getInfo().get(DELIVERY_TAG), is(equalTo(_deliveryTag))); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); - DeliveryState txnDischargeDeliveryState = txnState.getDeliveryState(); + DeliveryState txnDischargeDeliveryState = interaction.getCoordinatorLatestDeliveryState(); assertThat(txnDischargeDeliveryState, is(instanceOf(Rejected.class))); Rejected rejected = (Rejected) txnDischargeDeliveryState; Error error = rejected.getError(); @@ -454,13 +447,12 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.begin() .consumeResponse(Begin.class) // attaching coordinator link with supported outcomes Accepted and Rejected - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachHandle(linkHandle) @@ -472,13 +464,13 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryTag(_deliveryTag) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferSettled(Boolean.TRUE) .transfer(); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); - DeliveryState txDischargeDeliveryState = txnState.getDeliveryState(); + DeliveryState txDischargeDeliveryState = interaction.getCoordinatorLatestDeliveryState(); assertThat(txDischargeDeliveryState, is(instanceOf(Rejected.class))); Rejected rejected = (Rejected) txDischargeDeliveryState; @@ -521,13 +513,11 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase final Interaction interaction = openInteractionWithAnonymousRelayCapability(transport); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); - interaction.begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO, Accepted.ACCEPTED_SYMBOL) + .txnDeclare() .attachRole(Role.SENDER) .attachHandle(linkHandle) @@ -541,10 +531,9 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase .transferHandle(linkHandle) .transferPayload(generateMessagePayloadToDestination("Unknown")) .transferDeliveryTag(_deliveryTag) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferSettled(Boolean.TRUE) - .transfer() - .txnSendDischarge(txnState, false); + .transfer().txnSendDischarge(false); Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class); Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError(); diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java index 39a8a9b..8cd435b 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/queue/QueueDeletionTest.java @@ -51,7 +51,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState; import org.apache.qpid.tests.protocol.v1_0.Utils; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -132,7 +131,6 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase final UnsignedInteger linkHandle = UnsignedInteger.ONE; final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); Attach attach = interaction.negotiateProtocol() .consumeResponse() @@ -141,8 +139,8 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) @@ -153,7 +151,7 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase .transferHandle(linkHandle) .transferPayloadData(getTestName()) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transfer() .consumeResponse(Disposition.class) .getLatestResponse(Disposition.class); @@ -171,9 +169,9 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED)); assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle()))); - interaction.txnSendDischarge(txnState, false); + interaction.txnSendDischarge(false); - assertTransactionRollbackOnly(interaction, txnState); + assertTransactionRollbackOnly(interaction); } } @@ -187,7 +185,6 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); Attach attach = interaction.negotiateProtocol() .consumeResponse() .open() @@ -195,8 +192,8 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -221,7 +218,7 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) .disposition(); interaction.flowIncomingWindow(UnsignedInteger.valueOf(2)) @@ -244,14 +241,13 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase assertThat(receivedDetach.getError().getCondition(), is(AmqpError.RESOURCE_DELETED)); assertThat(receivedDetach.getHandle(), is(equalTo(attach.getHandle()))); - interaction.txnSendDischarge(txnState, false); + interaction.txnSendDischarge(false); - assertTransactionRollbackOnly(interaction, txnState); + assertTransactionRollbackOnly(interaction); } } - private void assertTransactionRollbackOnly(final Interaction interaction, - final InteractionTransactionalState txnState) throws Exception + private void assertTransactionRollbackOnly(final Interaction interaction) throws Exception { Disposition declareTransactionDisposition = null; Flow coordinatorFlow = null; @@ -266,7 +262,7 @@ public class QueueDeletionTest extends BrokerAdminUsingTestBase if (response.getBody() instanceof Flow) { final Flow flowResponse = (Flow) response.getBody(); - if (flowResponse.getHandle().equals(txnState.getHandle())) + if (flowResponse.getHandle().equals(interaction.getCoordinatorHandle())) { coordinatorFlow = flowResponse; } diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java index cd4f896..85dff52 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/transactiontimeout/TransactionTimeoutTest.java @@ -46,7 +46,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Role; import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState; import org.apache.qpid.tests.protocol.v1_0.Utils; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -74,7 +73,6 @@ public class TransactionTimeoutTest extends BrokerAdminUsingTestBase final UnsignedInteger linkHandle = UnsignedInteger.ONE; final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); Disposition responseDisposition = interaction.negotiateProtocol() .consumeResponse() .open() @@ -82,8 +80,8 @@ public class TransactionTimeoutTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) @@ -93,7 +91,7 @@ public class TransactionTimeoutTest extends BrokerAdminUsingTestBase .transferHandle(linkHandle) .transferPayloadData(getTestName()) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transfer() .consumeResponse(Disposition.class) .getLatestResponse(Disposition.class); @@ -116,7 +114,6 @@ public class TransactionTimeoutTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.negotiateProtocol() .consumeResponse() .open() @@ -124,8 +121,8 @@ public class TransactionTimeoutTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -150,7 +147,7 @@ public class TransactionTimeoutTest extends BrokerAdminUsingTestBase interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) .disposition() .sync(); diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java index 42f5dfc..dab344e 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java @@ -83,7 +83,6 @@ import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState; import org.apache.qpid.tests.protocol.v1_0.MessageDecoder; import org.apache.qpid.tests.protocol.v1_0.MessageEncoder; import org.apache.qpid.tests.protocol.v1_0.Utils; @@ -1056,9 +1055,8 @@ public class TransferTest extends BrokerAdminUsingTestBase Flow flow = interaction.getLatestResponse(Flow.class); assumeThat("insufficient credit for the test", flow.getLinkCredit().intValue(), is(greaterThan(2))); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE); - interaction.txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState); + interaction.txnAttachCoordinatorLink(UnsignedInteger.ONE) + .txnDeclare(); interaction.transferDeliveryId() .transferDeliveryTag(new Binary("A".getBytes(StandardCharsets.UTF_8))) @@ -1070,11 +1068,11 @@ public class TransferTest extends BrokerAdminUsingTestBase .transfer() .transferDeliveryId() .transferDeliveryTag(new Binary("C".getBytes(StandardCharsets.UTF_8))) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferPayloadData(contents[2]) .transfer(); - interaction.txnSendDischarge(txnState, false); + interaction.txnSendDischarge(false); assertDeliveries(interaction, Sets.newTreeSet(Arrays.asList(UnsignedInteger.ONE, UnsignedInteger.valueOf(2), @@ -1174,9 +1172,8 @@ public class TransferTest extends BrokerAdminUsingTestBase deliveryIds.add(interaction.getLatestDeliveryId()); } - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ONE); - interaction.txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState); + interaction.txnAttachCoordinatorLink(UnsignedInteger.ONE) + .txnDeclare(); interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) @@ -1188,10 +1185,10 @@ public class TransferTest extends BrokerAdminUsingTestBase .dispositionRole(Role.RECEIVER) .dispositionFirst(deliveryIds.get(2)) .dispositionLast(deliveryIds.get(3)) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) .disposition(); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); } String messageText = getTestName() + "_" + 4; diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java index 85040c8..369bd9b 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java @@ -55,7 +55,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState; import org.apache.qpid.tests.protocol.v1_0.Utils; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -81,24 +80,22 @@ public class DischargeTest extends BrokerAdminUsingTestBase { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO); final Interaction interaction = transport.newInteraction(); interaction.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState, Rejected.REJECTED_SYMBOL) - .txnDeclare(txnState); + .txnAttachCoordinatorLink(UnsignedInteger.ZERO, Rejected.REJECTED_SYMBOL) + .txnDeclare(); - assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class))); - assertThat(txnState.getCurrentTransactionId(), is(notNullValue())); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Declared.class))); + assertThat(interaction.getCurrentTransactionId(), is(notNullValue())); - txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8))); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(new Binary("nonExistingTransaction".getBytes(UTF_8)), false); interaction.doCloseConnection(); - assertThat(txnState.getDeliveryState(), is(instanceOf(Rejected.class))); - final Error error = ((Rejected) txnState.getDeliveryState()).getError(); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Rejected.class))); + final Error error = ((Rejected) interaction.getCoordinatorLatestDeliveryState()).getError(); assertThat(error, is(notNullValue())); if (KIND_BROKER_J.equals(getBrokerAdmin().getKind())) @@ -119,19 +116,17 @@ public class DischargeTest extends BrokerAdminUsingTestBase { try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { - final InteractionTransactionalState txnState = new InteractionTransactionalState(UnsignedInteger.ZERO); final Interaction interaction = transport.newInteraction(); interaction.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState, Accepted.ACCEPTED_SYMBOL) - .txnDeclare(txnState); + .txnAttachCoordinatorLink(UnsignedInteger.ZERO, Accepted.ACCEPTED_SYMBOL) + .txnDeclare(); - assertThat(txnState.getDeliveryState(), is(instanceOf(Declared.class))); - assertThat(txnState.getCurrentTransactionId(), is(notNullValue())); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Declared.class))); + assertThat(interaction.getCurrentTransactionId(), is(notNullValue())); - txnState.setLastTransactionId(new Binary("nonExistingTransaction".getBytes(UTF_8))); - interaction.txnSendDischarge(txnState, false); + interaction.txnSendDischarge(new Binary("nonExistingTransaction".getBytes(UTF_8)), false); final Detach detachResponse = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class); interaction.doCloseConnection(); @@ -157,13 +152,12 @@ public class DischargeTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -187,12 +181,11 @@ public class DischargeTest extends BrokerAdminUsingTestBase final UnsignedInteger deliveryId = transfers.get(0).getDeliveryId(); interaction.detach().consumeResponse(Detach.class) .dispositionFirst(deliveryId) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) .dispositionRole(Role.RECEIVER) - .disposition() - .txnDischarge(txnState, false); + .disposition().txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); interaction.doCloseConnection(); } @@ -217,13 +210,12 @@ public class DischargeTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachHandle(UnsignedInteger.ONE) @@ -232,16 +224,16 @@ public class DischargeTest extends BrokerAdminUsingTestBase .consumeResponse(Flow.class) .transferDeliveryId() - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferPayloadData(getTestName()) .transferHandle(UnsignedInteger.ONE) .transfer().consumeResponse(Disposition.class) .detachHandle(UnsignedInteger.ONE) .detach().consumeResponse(Detach.class); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); } final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); @@ -259,13 +251,12 @@ public class DischargeTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachHandle(UnsignedInteger.ONE) @@ -275,7 +266,7 @@ public class DischargeTest extends BrokerAdminUsingTestBase .consumeResponse(Flow.class) .transferDeliveryId() - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transferPayloadData(getTestName()) .transferHandle(UnsignedInteger.ONE) .transfer().consumeResponse(Disposition.class) @@ -283,9 +274,9 @@ public class DischargeTest extends BrokerAdminUsingTestBase .detachHandle(UnsignedInteger.ONE) .detachClose(true) .detach().consumeResponse(Detach.class); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); } final Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java index f44790f..02d28a7 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java @@ -58,7 +58,6 @@ import org.apache.qpid.tests.protocol.Response; import org.apache.qpid.tests.protocol.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; import org.apache.qpid.tests.protocol.v1_0.Interaction; -import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState; import org.apache.qpid.tests.protocol.v1_0.Utils; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -86,7 +85,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase final UnsignedInteger linkHandle = UnsignedInteger.ONE; final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); Disposition responseDisposition = interaction.negotiateProtocol() .consumeResponse() .open() @@ -94,8 +92,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) @@ -106,7 +104,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId() .transferHandle(linkHandle) .transferPayloadData(getTestName()) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transfer() .consumeResponse(Disposition.class) .getLatestResponse(Disposition.class); @@ -116,9 +114,9 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class))); assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class))); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); } Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); assertThat(receivedMessage, is(equalTo(getTestName()))); @@ -136,7 +134,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase final UnsignedInteger linkHandle = UnsignedInteger.ONE; final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); Disposition responseDisposition = interaction.negotiateProtocol() .consumeResponse() .open() @@ -144,8 +141,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) @@ -156,7 +153,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId() .transferHandle(linkHandle) .transferPayloadData(getTestName()) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transfer() .consumeResponse(Disposition.class) .getLatestResponse(Disposition.class); @@ -166,9 +163,9 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class))); assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class))); - interaction.txnDischarge(txnState, true); + interaction.txnDischarge(true); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); final String content = getTestName() + "_2"; Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content); @@ -188,7 +185,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase final UnsignedInteger linkHandle = UnsignedInteger.ONE; final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); Disposition responseDisposition = interaction.negotiateProtocol() .consumeResponse() .open() @@ -196,8 +192,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) @@ -210,7 +206,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .transferDeliveryId() .transferHandle(linkHandle) .transferPayloadData(getTestName()) - .transferTransactionalState(txnState.getCurrentTransactionId()) + .transferTransactionalStateFromCurrentTransaction() .transfer() .consumeResponse(Disposition.class) .getLatestResponse(Disposition.class); @@ -222,12 +218,12 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase interaction.dispositionRole(Role.SENDER) .dispositionSettled(true) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) .disposition(); - interaction.txnDischarge(txnState, false); + interaction.txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); } assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); } @@ -244,13 +240,13 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase final UnsignedInteger linkHandle = UnsignedInteger.ONE; final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); + Response<?> response = interaction.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.SENDER) .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME) @@ -280,7 +276,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); + interaction.negotiateProtocol() .consumeResponse() .open() @@ -288,8 +284,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -314,12 +310,10 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) - .disposition() - .txnDischarge(txnState, false); - + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) + .disposition().txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); } } @@ -332,7 +326,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.negotiateProtocol() .consumeResponse() .open() @@ -340,8 +333,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -366,11 +359,10 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) - .disposition() - .txnDischarge(txnState, true); + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) + .disposition().txnDischarge(true); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME); assertThat(receivedMessage, is(equalTo(getTestName()))); @@ -390,13 +382,12 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); List<Transfer> transfers = interaction.negotiateProtocol().consumeResponse() .open().consumeResponse(Open.class) .begin().consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -446,7 +437,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.negotiateProtocol() .consumeResponse() .open() @@ -454,8 +444,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -480,8 +470,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase Disposition settledDisposition = interaction.dispositionSettled(false) .dispositionRole(Role.RECEIVER) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), - new Accepted()) + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) .disposition() .consumeResponse(Disposition.class) .getLatestResponse(Disposition.class); @@ -490,9 +479,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase assertThat(settledDisposition.getState(), is(instanceOf(TransactionalState.class))); assertThat(((TransactionalState) settledDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class))); - interaction.txnDischarge(txnState, false); - - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + interaction.txnDischarge(false); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); } } @@ -509,7 +497,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.negotiateProtocol() .consumeResponse() .open() @@ -517,8 +504,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -533,7 +520,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) .flowHandleFromLinkHandle() - .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), txnState.getCurrentTransactionId())) + .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), + interaction.getCurrentTransactionId())) .flow() .receiveDelivery(); @@ -546,17 +534,15 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) + .dispositionTransactionalStateFromCurrentTransaction(new Accepted()) .dispositionFirstFromLatestDelivery() - .disposition() - .txnDischarge(txnState, false); - + .disposition().txnDischarge(false); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); Transfer transfer = transfers.get(0); assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class))); - assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId()))); + assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(interaction.getCurrentTransactionId()))); final String content = getTestName() + "_2"; Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, content); @@ -577,7 +563,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); interaction.negotiateProtocol() .consumeResponse() .open() @@ -585,8 +570,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) @@ -601,7 +586,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .flowNextOutgoingId(UnsignedInteger.ZERO) .flowLinkCredit(UnsignedInteger.ONE) .flowHandleFromLinkHandle() - .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), txnState.getCurrentTransactionId())) + .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), + interaction.getCurrentTransactionId())) .flow() .receiveDelivery(); @@ -614,17 +600,16 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase interaction.dispositionSettled(true) .dispositionRole(Role.RECEIVER) - .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted()) - .disposition() - .txnDischarge(txnState, true); + .dispositionTransactionalState(interaction.getCurrentTransactionId(), new Accepted()) + .disposition().txnDischarge(true); - assertThat(txnState.getDeliveryState(), is(instanceOf(Accepted.class))); + assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class))); assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName()))); Transfer transfer = transfers.get(0); assumeThat(transfer.getState(), is(instanceOf(TransactionalState.class))); - assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId()))); + assumeThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(interaction.getCurrentTransactionId()))); } } @@ -643,7 +628,6 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase try (FrameTransport transport = new FrameTransport(_brokerAddress).connect()) { final Interaction interaction = transport.newInteraction(); - final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO); Response<?> response = interaction.negotiateProtocol() .consumeResponse() .open() @@ -651,8 +635,8 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase .begin() .consumeResponse(Begin.class) - .txnAttachCoordinatorLink(txnState) - .txnDeclare(txnState) + .txnAttachCoordinatorLink(UnsignedInteger.ZERO) + .txnDeclare() .attachRole(Role.RECEIVER) .attachHandle(UnsignedInteger.ONE) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
