This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b0e5e3e PROTON-2636 Allow the awaitAccepted API to work inside a 
transaction
2b0e5e3e is described below

commit 2b0e5e3e200dd70342f875f1c190d3c3433dbb05
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Oct 27 16:14:50 2022 -0400

    PROTON-2636 Allow the awaitAccepted API to work inside a transaction
    
    The transactional state type should check the carried outcome to see if
    that outcome is accepted and report that state when the tracker asks if
    the state is accepted.
---
 .../protonj2/client/impl/ClientDeliveryState.java  |  5 ++
 .../protonj2/client/impl/TransactionsTest.java     | 55 ++++++++++++++++++++++
 2 files changed, 60 insertions(+)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
index 06e27cc4..883aff69 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliveryState.java
@@ -304,6 +304,11 @@ public abstract class ClientDeliveryState implements 
DeliveryState {
             return Type.TRANSACTIONAL;
         }
 
+        @Override
+        public boolean isAccepted() {
+            return txnState.getOutcome() instanceof Accepted;
+        }
+
         @Override
         org.apache.qpid.protonj2.types.transport.DeliveryState 
getProtonDeliveryState() {
             return txnState;
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
index 8d71b0b7..55429625 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
@@ -930,6 +930,61 @@ public class TransactionsTest extends 
ImperativeClientTestCase {
         }
     }
 
+    @Test
+    public void testAwaitSettlementWorksForMessageSentInTransaction() throws 
Exception {
+        final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond();
+            peer.remoteFlow().withLinkCredit(1).queue();
+            peer.expectCoordinatorAttach().respond();
+            peer.remoteFlow().withLinkCredit(2).queue();
+            peer.expectDeclare().accept(txnId);
+            peer.expectTransfer().withHandle(0)
+                                 .withNonNullPayload()
+                                 
.withState().transactional().withTxnId(txnId).and()
+                                 .respond()
+                                 
.withState().transactional().withTxnId(txnId).withAccepted().and()
+                                 .withSettled(true);
+            peer.expectDischarge().withFail(false).withTxnId(txnId).accept();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            Session session = connection.openSession().openFuture().get();
+            Sender sender = session.openSender("address").openFuture().get();
+
+            session.beginTransaction();
+
+            final Tracker tracker = 
sender.send(Message.create("test-message"));
+
+            assertNotNull(tracker);
+            assertNotNull(tracker.awaitAccepted());
+            assertTrue(tracker.remoteState().isAccepted());
+            assertEquals(tracker.remoteState().getType(), 
DeliveryState.Type.TRANSACTIONAL,
+                         "Delivery inside transaction should have 
Transactional state");
+            assertEquals(tracker.state().getType(), 
DeliveryState.Type.TRANSACTIONAL,
+                         "Delivery inside transaction should have 
Transactional state: " + tracker.state().getType());
+            Wait.assertTrue("Delivery in transaction should be locally settled 
after response", () -> tracker.settled());
+
+            session.commitTransaction();
+
+            session.closeAsync();
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     @Test
     public void testSendMessagesInsideOfUniqueTransactions() throws Exception {
         final byte[] txnId1 = new byte[] { 0, 1, 2, 3 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to