This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 88e39921 PROTON-2635 Add some additional tests for transactions and
reconnect
88e39921 is described below
commit 88e399211ea1f95a3d1e84b8f5983a5fa998f68e
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Oct 27 14:53:38 2022 -0400
PROTON-2635 Add some additional tests for transactions and reconnect
Add additional tests that check that transactional state is correct
after a client reconnects.
---
.../client/impl/ReconnectTransactionTest.java | 217 +++++++++++++++++++++
1 file changed, 217 insertions(+)
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTransactionTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTransactionTest.java
index 29884aa2..6aa250d4 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTransactionTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTransactionTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.protonj2.client.impl;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
import java.net.URI;
@@ -24,9 +26,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.DeliveryState;
+import org.apache.qpid.protonj2.client.Message;
+import org.apache.qpid.protonj2.client.Sender;
import org.apache.qpid.protonj2.client.Session;
+import org.apache.qpid.protonj2.client.Tracker;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import
org.apache.qpid.protonj2.client.exceptions.ClientTransactionRolledBackException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.client.test.Wait;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -90,4 +98,213 @@ class ReconnectTransactionTest extends
ImperativeClientTestCase {
finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testTransactionInDoubtAfterReconnect() throws Exception {
+ final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+ firstPeer.expectCoordinatorAttach().respond();
+ firstPeer.remoteFlow().withLinkCredit(2).queue();
+ firstPeer.expectDeclare().accept(txnId);
+
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ firstPeer.remoteFlow().withLinkCredit(1).queue();
+ firstPeer.expectTransfer().withNonNullPayload();
+ firstPeer.dropAfterLastHandler();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ Session session = connection.openSession().openFuture().get();
+
+ session.beginTransaction();
+
+ Sender sender = session.openSender("test").openFuture().get();
+ sender.send(Message.create("Hello"));
+
+ firstPeer.waitForScriptToComplete();
+
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectClose().respond();
+
+ try {
+ session.commitTransaction();
+ fail("Should have failed to declare transaction");
+ } catch (ClientTransactionRolledBackException cliEx) {
+ LOG.info("Caught expected error from test", cliEx);
+ }
+
+ connection.closeAsync().get();
+
+ finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testSendInTransactionIsNoOpAfterReconnect() throws Exception {
+ final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+ firstPeer.expectCoordinatorAttach().respond();
+ firstPeer.remoteFlow().withLinkCredit(2).queue();
+ firstPeer.expectDeclare().accept(txnId);
+
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ firstPeer.remoteFlow().withLinkCredit(1).queue();
+ firstPeer.expectTransfer().withNonNullPayload();
+ firstPeer.dropAfterLastHandler();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ finalPeer.remoteFlow().withLinkCredit(1).queue();
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ Session session = connection.openSession().openFuture().get();
+
+ session.beginTransaction();
+
+ Sender sender = session.openSender("test").openFuture().get();
+ sender.send(Message.create("Hello"));
+
+ firstPeer.waitForScriptToComplete();
+
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectClose().respond();
+
+ sender.send(Message.create("Hello Again"));
+
+ try {
+ session.commitTransaction();
+ fail("Should have failed to declare transaction");
+ } catch (ClientTransactionRolledBackException cliEx) {
+ LOG.info("Caught expected error from test", cliEx);
+ }
+
+ connection.closeAsync().get();
+
+ finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void
testNewTransactionCanBeCreatedAfterOldInstanceRolledBackByReconnect() throws
Exception {
+ final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ firstPeer.expectCoordinatorAttach().respond();
+ firstPeer.remoteFlow().withLinkCredit(2).queue();
+ firstPeer.expectDeclare().accept(txnId);
+ firstPeer.dropAfterLastHandler(5);
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ finalPeer.remoteFlow().withLinkCredit(1).queue();
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ Session session = connection.openSession().openFuture().get();
+ Sender sender = session.openSender("test").openFuture().get();
+
+ session.beginTransaction();
+
+ firstPeer.waitForScriptToComplete();
+
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectCoordinatorAttach().respond();
+ finalPeer.remoteFlow().withLinkCredit(2).queue();
+ finalPeer.expectDeclare().accept(txnId);
+ finalPeer.expectTransfer().withHandle(0)
+ .withNonNullPayload()
+
.withState().transactional().withTxnId(txnId).and()
+ .respond()
+
.withState().transactional().withTxnId(txnId).withAccepted().and()
+ .withSettled(true);
+
finalPeer.expectDischarge().withFail(false).withTxnId(txnId).accept();
+ finalPeer.expectEnd().respond();
+ finalPeer.expectClose().respond();
+
+ try {
+ session.commitTransaction();
+ fail("Should have failed to declare transaction");
+ } catch (ClientTransactionRolledBackException cliEx) {
+ LOG.info("Caught expected error from test", cliEx);
+ }
+
+ session.beginTransaction();
+
+ final Tracker tracker =
sender.send(Message.create("test-message"));
+
+ assertNotNull(tracker);
+ assertNotNull(tracker.settlementFuture().get());
+ assertEquals(tracker.remoteState().getType(),
DeliveryState.Type.TRANSACTIONAL,
+ "Delivery inside transaction should have
Transactional state");
+ assertNotNull(tracker.state());
+ assertEquals(tracker.state().getType(),
DeliveryState.Type.TRANSACTIONAL,
+ "Delivery inside transaction should have
Transactional state: " + tracker.state().getType());
+ Wait.assertTrue("Delivery in transaction should be locally settled
after response", () -> tracker.settled());
+
+ try {
+ session.commitTransaction();
+ } catch (ClientException cliEx) {
+ LOG.info("Caught umexpected error from test", cliEx);
+ fail("Should not have failed to declare transaction");
+ }
+
+ session.closeAsync();
+ connection.closeAsync().get();
+
+ finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]