This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 2b0e5e3e PROTON-2636 Allow the awaitAccepted API to work inside a
transaction
2b0e5e3e is described below
commit 2b0e5e3e200dd70342f875f1c190d3c3433dbb05
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Oct 27 16:14:50 2022 -0400
PROTON-2636 Allow the awaitAccepted API to work inside a transaction
The transactional state type should check the carried outcome to see if
that outcome is accepted and report that state when the tracker asks if
the state is accepted.
---
.../protonj2/client/impl/ClientDeliveryState.java | 5 ++
.../protonj2/client/impl/TransactionsTest.java | 55 ++++++++++++++++++++++
2 files changed, 60 insertions(+)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
index 06e27cc4..883aff69 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
@@ -304,6 +304,11 @@ public abstract class ClientDeliveryState implements
DeliveryState {
return Type.TRANSACTIONAL;
}
+ @Override
+ public boolean isAccepted() {
+ return txnState.getOutcome() instanceof Accepted;
+ }
+
@Override
org.apache.qpid.protonj2.types.transport.DeliveryState
getProtonDeliveryState() {
return txnState;
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
index 8d71b0b7..55429625 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
@@ -930,6 +930,61 @@ public class TransactionsTest extends
ImperativeClientTestCase {
}
}
+ @Test
+ public void testAwaitSettlementWorksForMessageSentInTransaction() throws
Exception {
+ final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respond();
+ peer.remoteFlow().withLinkCredit(1).queue();
+ peer.expectCoordinatorAttach().respond();
+ peer.remoteFlow().withLinkCredit(2).queue();
+ peer.expectDeclare().accept(txnId);
+ peer.expectTransfer().withHandle(0)
+ .withNonNullPayload()
+
.withState().transactional().withTxnId(txnId).and()
+ .respond()
+
.withState().transactional().withTxnId(txnId).withAccepted().and()
+ .withSettled(true);
+ peer.expectDischarge().withFail(false).withTxnId(txnId).accept();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort());
+ Session session = connection.openSession().openFuture().get();
+ Sender sender = session.openSender("address").openFuture().get();
+
+ session.beginTransaction();
+
+ final Tracker tracker =
sender.send(Message.create("test-message"));
+
+ assertNotNull(tracker);
+ assertNotNull(tracker.awaitAccepted());
+ assertTrue(tracker.remoteState().isAccepted());
+ assertEquals(tracker.remoteState().getType(),
DeliveryState.Type.TRANSACTIONAL,
+ "Delivery inside transaction should have
Transactional state");
+ assertEquals(tracker.state().getType(),
DeliveryState.Type.TRANSACTIONAL,
+ "Delivery inside transaction should have
Transactional state: " + tracker.state().getType());
+ Wait.assertTrue("Delivery in transaction should be locally settled
after response", () -> tracker.settled());
+
+ session.commitTransaction();
+
+ session.closeAsync();
+ connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
@Test
public void testSendMessagesInsideOfUniqueTransactions() throws Exception {
final byte[] txnId1 = new byte[] { 0, 1, 2, 3 };
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]