This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new a16c29e  PROTON-2505 Ensure that stream sender write fail fast on 
reconnection
a16c29e is described below

commit a16c29eae32a6a2eae6f9c06b88108c53c4155b2
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Mar 9 17:05:52 2022 -0500

    PROTON-2505 Ensure that stream sender write fail fast on reconnection
    
    When a stream sender message is in use and the connection drops and
    reconnects future write should fail fast an not wait for the write
    timeout.
---
 .../qpid/protonj2/client/impl/ClientSender.java    |  15 ++
 .../protonj2/client/impl/ClientStreamSender.java   |   2 +-
 .../client/impl/ReconnectStreamSenderTest.java     | 160 +++++++++++++++------
 3 files changed, 136 insertions(+), 41 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 3d3a80f..8686c4c 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -628,12 +628,27 @@ class ClientSender implements Sender {
     }
 
     protected boolean notClosedOrFailed(ClientFuture<?> request) {
+        return notClosedOrFailed(request, protonSender);
+    }
+
+    protected boolean notClosedOrFailed(ClientFuture<?> request, 
org.apache.qpid.protonj2.engine.Sender sender) {
         if (isClosed()) {
             request.failed(new ClientIllegalStateException("The Sender was 
explicitly closed", failureCause));
             return false;
         } else if (failureCause != null) {
             request.failed(failureCause);
             return false;
+        } else if (sender.isLocallyClosedOrDetached()) {
+            if (sender.getConnection().getRemoteCondition() != null) {
+                
request.failed(ClientExceptionSupport.convertToConnectionClosedException(sender.getConnection().getRemoteCondition()));
+            } else if (sender.getSession().getRemoteCondition() != null) {
+                
request.failed(ClientExceptionSupport.convertToSessionClosedException(sender.getSession().getRemoteCondition()));
+            } else if (sender.getEngine().failureCause() != null) {
+                
request.failed(ClientExceptionSupport.convertToConnectionClosedException(sender.getEngine().failureCause()));
+            } else {
+                request.failed(new ClientIllegalStateException("Sender closed 
without a specific error condition"));
+            }
+            return false;
         } else {
             return true;
         }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 0edebac..491fe31 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -125,7 +125,7 @@ public final class ClientStreamSender extends ClientSender 
implements StreamSend
             this, context.getProtonDelivery(), messageFormat, buffer, 
context.completed(), operation);
 
         executor.execute(() -> {
-            if (notClosedOrFailed(operation)) {
+            if (notClosedOrFailed(operation, 
context.getProtonDelivery().getLink())) {
                 try {
                     if (protonSender.isSendable()) {
                         session.getTransactionContext().send(envelope, null, 
isSendingSettled());
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
index 0dd4d18..0796d07 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -31,7 +32,10 @@ import org.apache.qpid.protonj2.client.ConnectionOptions;
 import org.apache.qpid.protonj2.client.OutputStreamOptions;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamSenderMessage;
+import org.apache.qpid.protonj2.client.StreamSenderOptions;
+import 
org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import 
org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
@@ -47,8 +51,7 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
 
     @Test
     void testStreamMessageFlushFailsAfterConnectionDropped() throws Exception {
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); 
ProtonTestServer finalPeer = new ProtonTestServer()) {
 
             firstPeer.expectSASLAnonymousConnect();
             firstPeer.expectOpen().respond();
@@ -92,14 +95,16 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
             
firstPeer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
             firstPeer.dropAfterLastHandler();
 
-            // Write two then after connection drops the message should fail 
on future writes
+            // Write two then after connection drops the message should fail 
on future
+            // writes
             stream.write(new byte[] { 0, 1, 2, 3 });
             stream.flush();
             stream.write(new byte[] { 4, 5, 6, 7 });
             stream.flush();
 
             firstPeer.waitForScriptToComplete();
-            // Reconnection should have occurred now and we should not be able 
to flush data from
+            // Reconnection should have occurred now and we should not be able 
to flush data
+            // from
             // the stream as its initial sender instance was closed on 
disconnect.
             finalPeer.waitForScriptToComplete();
             finalPeer.expectClose().respond();
@@ -122,35 +127,34 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
 
     @Test
     void testStreamMessageCloseThatFlushesFailsAfterConnectionDropped() throws 
Exception {
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
-
-           firstPeer.expectSASLAnonymousConnect();
-           firstPeer.expectOpen().respond();
-           firstPeer.expectBegin().respond();
-           firstPeer.expectAttach().ofSender().respond();
-           firstPeer.remoteFlow().withLinkCredit(1).queue();
-           firstPeer.start();
-
-           finalPeer.expectSASLAnonymousConnect();
-           finalPeer.expectOpen().respond();
-           finalPeer.expectBegin().respond();
-           finalPeer.expectAttach().ofSender().respond();
-           finalPeer.remoteFlow().withLinkCredit(1).queue();
-           finalPeer.start();
-
-           final URI primaryURI = firstPeer.getServerURI();
-           final URI backupURI = finalPeer.getServerURI();
-
-           ConnectionOptions options = new ConnectionOptions();
-           options.idleTimeout(5, TimeUnit.SECONDS);
-           options.reconnectOptions().reconnectEnabled(true);
-           
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
-
-           Client container = Client.create();
-           Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
-           StreamSender sender = connection.openStreamSender("test-queue");
-           StreamSenderMessage message = sender.beginMessage();
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); 
ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            firstPeer.expectAttach().ofSender().respond();
+            firstPeer.remoteFlow().withLinkCredit(1).queue();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            finalPeer.expectAttach().ofSender().respond();
+            finalPeer.remoteFlow().withLinkCredit(1).queue();
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.idleTimeout(5, TimeUnit.SECONDS);
+            options.reconnectOptions().reconnectEnabled(true);
+            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+            StreamSender sender = connection.openStreamSender("test-queue");
+            StreamSenderMessage message = sender.beginMessage();
 
             OutputStream stream = message.body();
 
@@ -167,7 +171,8 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
             
firstPeer.expectTransfer().withPayload(payloadMatcher2).withMore(true);
             firstPeer.dropAfterLastHandler();
 
-            // Write two then after connection drops the message should fail 
on future writes
+            // Write two then after connection drops the message should fail 
on future
+            // writes
             stream.write(new byte[] { 0, 1, 2, 3 });
             stream.flush();
             stream.write(new byte[] { 4, 5, 6, 7 });
@@ -175,7 +180,8 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
 
             firstPeer.waitForScriptToComplete();
 
-            // Reconnection should have occurred now and we should not be able 
to flush data from
+            // Reconnection should have occurred now and we should not be able 
to flush data
+            // from
             // the stream as its initial sender instance was closed on 
disconnect.
             finalPeer.waitForScriptToComplete();
             finalPeer.expectClose().respond();
@@ -198,8 +204,7 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
 
     @Test
     void testStreamMessageWriteThatFlushesFailsAfterConnectionDropped() throws 
Exception {
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); 
ProtonTestServer finalPeer = new ProtonTestServer()) {
 
             firstPeer.expectSASLAnonymousConnect();
             firstPeer.expectOpen().respond();
@@ -237,7 +242,8 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
 
             firstPeer.waitForScriptToComplete();
 
-            // Reconnection should have occurred now and we should not be able 
to flush data from
+            // Reconnection should have occurred now and we should not be able 
to flush data
+            // from
             // the stream as its initial sender instance was closed on 
disconnect.
             finalPeer.waitForScriptToComplete();
             finalPeer.expectClose().respond();
@@ -256,9 +262,83 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
     }
 
     @Test
+    public void 
testStreamMessageWriteThatFlushesFailsAfterConnectionDroppedAndReconnected() 
throws Exception {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); 
ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            EncodedDataMatcher dataMatcher = new EncodedDataMatcher(new byte[] 
{ 0, 1, 2, 3 });
+            TransferPayloadCompositeMatcher payloadMatcher = new 
TransferPayloadCompositeMatcher();
+            payloadMatcher.setMessageContentMatcher(dataMatcher);
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            firstPeer.expectAttach().ofSender().respond();
+            firstPeer.remoteFlow().withLinkCredit(1).queue();
+            
firstPeer.expectTransfer().withPayload(payloadMatcher).withMore(true);
+            firstPeer.dropAfterLastHandler();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            finalPeer.expectAttach().ofSender().respond();
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.maxFrameSize(32768);
+            options.idleTimeout(5, TimeUnit.SECONDS);
+            options.reconnectOptions().reconnectEnabled(true);
+            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+            StreamSenderOptions senderOptions = new StreamSenderOptions();
+            senderOptions.sendTimeout(1000);
+            StreamSender sender = connection.openStreamSender("test-queue", 
senderOptions);
+            StreamSenderMessage message = sender.beginMessage();
+            OutputStream stream = message.body();
+
+            stream.write(new byte[] { 0, 1, 2, 3 });
+            stream.flush();
+
+            firstPeer.waitForScriptToComplete();
+
+            // Reconnection should have occurred now and we should not be able 
to flush data
+            // from the stream as its initial sender instance was closed on 
disconnect.
+            finalPeer.waitForScriptToComplete();
+
+            // Ensure that idle processing happens in case send blocks so we 
can see the
+            // send timed out exception
+            finalPeer.remoteEmptyFrame().later(5000);
+            finalPeer.remoteEmptyFrame().later(10000);
+            finalPeer.remoteEmptyFrame().later(15000);
+            finalPeer.remoteEmptyFrame().later(20000); // Test timeout kicks 
in now
+            finalPeer.expectClose().respond();
+
+            byte[] payload = new byte[1024];
+            Arrays.fill(payload, (byte) 65);
+
+            try {
+                stream.write(payload);
+                stream.flush();
+                fail("Should not be able to write section after connection 
drop");
+            } catch (IOException ioe) {
+                assertFalse(ioe.getCause() instanceof 
ClientSendTimedOutException);
+                assertTrue(ioe.getCause() instanceof 
ClientConnectionRemotelyClosedException);
+            }
+
+            connection.closeAsync().get();
+
+            finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
     void testStreamSenderRecoveredAfterReconnectCanCreateAndStreamBytes() 
throws Exception {
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
+        try (ProtonTestServer firstPeer = new ProtonTestServer(); 
ProtonTestServer finalPeer = new ProtonTestServer()) {
 
             firstPeer.expectSASLAnonymousConnect();
             firstPeer.expectOpen().respond();

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to