This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 88e39921 PROTON-2635 Add some additional tests for transactions and 
reconnect
88e39921 is described below

commit 88e399211ea1f95a3d1e84b8f5983a5fa998f68e
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Oct 27 14:53:38 2022 -0400

    PROTON-2635 Add some additional tests for transactions and reconnect
    
    Add additional tests that check that transactional state is correct
    after a client reconnects.
---
 .../client/impl/ReconnectTransactionTest.java      | 217 +++++++++++++++++++++
 1 file changed, 217 insertions(+)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTransactionTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTransactionTest.java
index 29884aa2..6aa250d4 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTransactionTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTransactionTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.net.URI;
@@ -24,9 +26,15 @@ import java.util.concurrent.TimeUnit;
 import org.apache.qpid.protonj2.client.Client;
 import org.apache.qpid.protonj2.client.Connection;
 import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.DeliveryState;
+import org.apache.qpid.protonj2.client.Message;
+import org.apache.qpid.protonj2.client.Sender;
 import org.apache.qpid.protonj2.client.Session;
+import org.apache.qpid.protonj2.client.Tracker;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import 
org.apache.qpid.protonj2.client.exceptions.ClientTransactionRolledBackException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.client.test.Wait;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -90,4 +98,213 @@ class ReconnectTransactionTest extends 
ImperativeClientTestCase {
             finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testTransactionInDoubtAfterReconnect() throws Exception {
+        final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            firstPeer.expectCoordinatorAttach().respond();
+            firstPeer.remoteFlow().withLinkCredit(2).queue();
+            firstPeer.expectDeclare().accept(txnId);
+            
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+            firstPeer.remoteFlow().withLinkCredit(1).queue();
+            firstPeer.expectTransfer().withNonNullPayload();
+            firstPeer.dropAfterLastHandler();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.reconnectOptions().reconnectEnabled(true);
+            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+            Session session = connection.openSession().openFuture().get();
+
+            session.beginTransaction();
+
+            Sender sender = session.openSender("test").openFuture().get();
+            sender.send(Message.create("Hello"));
+
+            firstPeer.waitForScriptToComplete();
+
+            finalPeer.waitForScriptToComplete();
+            finalPeer.expectClose().respond();
+
+            try {
+                session.commitTransaction();
+                fail("Should have failed to declare transaction");
+            } catch (ClientTransactionRolledBackException cliEx) {
+                LOG.info("Caught expected error from test", cliEx);
+            }
+
+            connection.closeAsync().get();
+
+            finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testSendInTransactionIsNoOpAfterReconnect() throws Exception {
+        final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            firstPeer.expectCoordinatorAttach().respond();
+            firstPeer.remoteFlow().withLinkCredit(2).queue();
+            firstPeer.expectDeclare().accept(txnId);
+            
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+            firstPeer.remoteFlow().withLinkCredit(1).queue();
+            firstPeer.expectTransfer().withNonNullPayload();
+            firstPeer.dropAfterLastHandler();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+            finalPeer.remoteFlow().withLinkCredit(1).queue();
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.reconnectOptions().reconnectEnabled(true);
+            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+            Session session = connection.openSession().openFuture().get();
+
+            session.beginTransaction();
+
+            Sender sender = session.openSender("test").openFuture().get();
+            sender.send(Message.create("Hello"));
+
+            firstPeer.waitForScriptToComplete();
+
+            finalPeer.waitForScriptToComplete();
+            finalPeer.expectClose().respond();
+
+            sender.send(Message.create("Hello Again"));
+
+            try {
+                session.commitTransaction();
+                fail("Should have failed to declare transaction");
+            } catch (ClientTransactionRolledBackException cliEx) {
+                LOG.info("Caught expected error from test", cliEx);
+            }
+
+            connection.closeAsync().get();
+
+            finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void 
testNewTransactionCanBeCreatedAfterOldInstanceRolledBackByReconnect() throws 
Exception {
+        final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+            firstPeer.expectCoordinatorAttach().respond();
+            firstPeer.remoteFlow().withLinkCredit(2).queue();
+            firstPeer.expectDeclare().accept(txnId);
+            firstPeer.dropAfterLastHandler(5);
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+            finalPeer.remoteFlow().withLinkCredit(1).queue();
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.reconnectOptions().reconnectEnabled(true);
+            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+            Session session = connection.openSession().openFuture().get();
+            Sender sender = session.openSender("test").openFuture().get();
+
+            session.beginTransaction();
+
+            firstPeer.waitForScriptToComplete();
+
+            finalPeer.waitForScriptToComplete();
+            finalPeer.expectCoordinatorAttach().respond();
+            finalPeer.remoteFlow().withLinkCredit(2).queue();
+            finalPeer.expectDeclare().accept(txnId);
+            finalPeer.expectTransfer().withHandle(0)
+                                      .withNonNullPayload()
+                                      
.withState().transactional().withTxnId(txnId).and()
+                                      .respond()
+                                      
.withState().transactional().withTxnId(txnId).withAccepted().and()
+                                      .withSettled(true);
+            
finalPeer.expectDischarge().withFail(false).withTxnId(txnId).accept();
+            finalPeer.expectEnd().respond();
+            finalPeer.expectClose().respond();
+
+            try {
+                session.commitTransaction();
+                fail("Should have failed to declare transaction");
+            } catch (ClientTransactionRolledBackException cliEx) {
+                LOG.info("Caught expected error from test", cliEx);
+            }
+
+            session.beginTransaction();
+
+            final Tracker tracker = 
sender.send(Message.create("test-message"));
+
+            assertNotNull(tracker);
+            assertNotNull(tracker.settlementFuture().get());
+            assertEquals(tracker.remoteState().getType(), 
DeliveryState.Type.TRANSACTIONAL,
+                         "Delivery inside transaction should have 
Transactional state");
+            assertNotNull(tracker.state());
+            assertEquals(tracker.state().getType(), 
DeliveryState.Type.TRANSACTIONAL,
+                         "Delivery inside transaction should have 
Transactional state: " + tracker.state().getType());
+            Wait.assertTrue("Delivery in transaction should be locally settled 
after response", () -> tracker.settled());
+
+            try {
+                session.commitTransaction();
+            } catch (ClientException cliEx) {
+                LOG.info("Caught umexpected error from test", cliEx);
+                fail("Should not have failed to declare transaction");
+            }
+
+            session.closeAsync();
+            connection.closeAsync().get();
+
+            finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to