This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 8e74916d PROTON-2697 Resolve exception if receive timeout is less than 
minus one
8e74916d is described below

commit 8e74916dac8455fa711a75d8f4cfab827e6f4d78
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Mar 27 13:04:23 2023 -0400

    PROTON-2697 Resolve exception if receive timeout is less than minus one
    
    Accept all negative values as infinite wait to allow for unit
    conversions that result in values lower than minus one causing an
    exception on thread wait.
---
 .../org/apache/qpid/protonj2/client/Receiver.java  | 12 ++---
 .../qpid/protonj2/client/StreamReceiver.java       | 12 ++---
 .../qpid/protonj2/client/impl/ClientReceiver.java  |  2 +-
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 57 +++++++++++++++++++++
 .../protonj2/client/impl/StreamReceiverTest.java   | 58 ++++++++++++++++++++++
 5 files changed, 128 insertions(+), 13 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
index ba6cefd4..dcbf2cf1 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
@@ -64,14 +64,14 @@ public interface Receiver extends Link<Receiver> {
 
     /**
      * Blocking receive method that waits the given time interval for the 
remote to provide a
-     * {@link Delivery} for consumption.  The amount of time this method 
blocks is based on the
-     * timeout value. If timeout is equal to <code>-1</code> then it blocks 
until a Delivery is
-     * received. If timeout is equal to zero then it will not block and simply 
return a
-     * {@link Delivery} if one is available locally.  If timeout value is 
greater than zero then it
-     * blocks up to timeout amount of time.
+     * {@link Delivery} for consumption. The amount of time this method blocks 
is based on the
+     * timeout value. If the timeout is less than zero then it blocks until a 
Delivery is received.
+     * If the timeout is equal to zero then it will not block and simply 
return a {@link Delivery}
+     * if one is available locally. If the timeout value is greater than zero 
then it blocks up to
+     * timeout amount of time.
      * <p>
      * Receive calls will only grant credit on their own if a credit window is 
configured in the
-     * {@link ReceiverOptions} which is done by default.  If the client 
application has not configured
+     * {@link ReceiverOptions} which is done by default. If the client 
application has not configured
      * a credit window or granted credit manually this method will not 
automatically grant any credit
      * when it enters the wait for a new incoming {@link Delivery}.
      *
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
index 3de8aa76..570d281a 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
@@ -71,14 +71,14 @@ public interface StreamReceiver extends 
Link<StreamReceiver> {
 
     /**
      * Blocking receive method that waits the given time interval for the 
remote to provide a
-     * {@link StreamReceiverMessage} for consumption.  The amount of time this 
method blocks is based on the
-     * timeout value. If timeout is equal to <code>-1</code> then it blocks 
until a Delivery is
-     * received. If timeout is equal to zero then it will not block and simply 
return a
-     * {@link StreamReceiverMessage} if one is available locally.  If timeout 
value is greater than zero then it
-     * blocks up to timeout amount of time.
+     * {@link StreamReceiverMessage} for consumption. The amount of time this 
method blocks is based on
+     * the timeout value. If the timeout is negative then it blocks until a 
Delivery is received. If the
+     * timeout is equal to zero then it will not block and simply return a 
{@link StreamReceiverMessage}
+     * if one is available locally. If the timeout value is greater than zero 
then it blocks up to timeout
+     * amount of time.
      * <p>
      * Receive calls will only grant credit on their own if a credit window is 
configured in the
-     * {@link StreamReceiverOptions} which is done by default.  If the client 
application has not configured
+     * {@link StreamReceiverOptions} which is done by default. If the client 
application has not configured
      * a credit window or granted credit manually this method will not 
automatically grant any credit
      * when it enters the wait for a new incoming {@link 
StreamReceiverMessage}.
      *
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index 1f7cafd4..82d38671 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -67,7 +67,7 @@ public final class ClientReceiver extends 
ClientReceiverLinkType<Receiver> imple
         checkClosedOrFailed();
 
         try {
-            ClientDelivery delivery = 
deliveryQueue.dequeue(units.toMillis(timeout));
+            ClientDelivery delivery = deliveryQueue.dequeue(Math.max(-1, 
units.toMillis(timeout)));
             if (delivery != null) {
                 if (options.autoAccept()) {
                     disposition(delivery.protonDelivery(), 
Accepted.getInstance(), options.autoSettle());
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 9c7adcc4..23ab5dc9 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2989,4 +2989,61 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testReceiveAcceptsTimeoutAndWaitsForDelivery() throws 
Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(1, 
TimeUnit.MINUTES);
+    }
+
+    @Test
+    public void 
testReceiveAcceptsNegtiveValuesAsInfiniteTimeoutsMinusOneMillisends() throws 
Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(-1, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void 
testReceiveAcceptsNegtiveValuesAsInfiniteTimeoutsMinusOneSeconds() throws 
Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(-1, 
TimeUnit.SECONDS);
+    }
+
+    public void doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(long 
timeout, TimeUnit units) throws Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow();
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).afterDelay(25).queue();
+            peer.expectDisposition().withFirst(0)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            final Receiver receiver = connection.openReceiver("test-queue");
+            final Delivery delivery = receiver.receive(timeout, units);
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            assertNotNull(delivery);
+
+            receiver.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 2ca78f0a..9fd3faa2 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -3860,6 +3860,64 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
        }
     }
 
+    @Test
+    public void testReceiveAcceptsTimeoutAndWaitsForDelivery() throws 
Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(1, 
TimeUnit.MINUTES);
+    }
+
+    @Test
+    public void 
testReceiveAcceptsNegtiveValuesAsInfiniteTimeoutsMinusOneMillisends() throws 
Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(-1, 
TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void 
testReceiveAcceptsNegtiveValuesAsInfiniteTimeoutsMinusOneSeconds() throws 
Exception {
+        doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(-1, 
TimeUnit.SECONDS);
+    }
+
+    public void doTestReceiveAcceptsNegtiveValuesAsInfiniteTimeouts(long 
timeout, TimeUnit units) throws Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow();
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).afterDelay(25).queue();
+            peer.expectDisposition().withFirst(0)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
+            final StreamDelivery delivery = receiver.receive(timeout, units);
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+
+            assertNotNull(delivery);
+
+            receiver.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     private byte[] createInvalidHeaderEncoding() {
         final byte[] buffer = new byte[12];
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to