This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ded26ed PROTON-2807 Account for zero copy buffers on ingest activity 
tracking
7ded26ed is described below

commit 7ded26ed54d1badf0017983fd10dffc78022523d
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 15 13:55:20 2024 -0400

    PROTON-2807 Account for zero copy buffers on ingest activity tracking
    
    Update data ingest in the engine to account for the zero copy mechanics
    in proton buffers resulting in missing the update to the incoming
    sequence tracker and early read check idle timeout errors.
---
 .../qpid/protonj2/engine/impl/ProtonEngine.java    | 14 ++---
 .../protonj2/engine/impl/ProtonEngineTest.java     | 69 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 7 deletions(-)

diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
index a6d59af0..b7904e14 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
@@ -195,7 +195,7 @@ public class ProtonEngine implements Engine {
             private final ScheduledExecutorService service = executor;
 
             @Override
-                       public boolean isShutdown() {
+            public boolean isShutdown() {
                 return service.isShutdown();
             }
 
@@ -261,14 +261,14 @@ public class ProtonEngine implements Engine {
             throw new EngineNotWritableException("Engine is currently not 
accepting new input");
         }
 
-        try {
-            final int startIndex = input.getReadOffset();
-            pipeline.fireRead(input);
-            if (input.getReadOffset() != startIndex) {
+        if (input.isReadable()) {
+            try {
+                pipeline.fireRead(input);
+            } catch (Exception error) {
+                throw engineFailed(error);
+            } finally {
                 inputSequence++;
             }
-        } catch (Exception error) {
-            throw engineFailed(error);
         }
 
         return this;
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
index a5fb3484..21a465c5 100644
--- 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
@@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.security.sasl.SaslException;
 
@@ -40,6 +41,8 @@ import org.apache.qpid.protonj2.engine.Engine;
 import org.apache.qpid.protonj2.engine.EngineFactory;
 import org.apache.qpid.protonj2.engine.EngineState;
 import org.apache.qpid.protonj2.engine.HeaderEnvelope;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
+import org.apache.qpid.protonj2.engine.Receiver;
 import org.apache.qpid.protonj2.engine.SASLEnvelope;
 import org.apache.qpid.protonj2.engine.Session;
 import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
@@ -1378,4 +1381,70 @@ public class ProtonEngineTest extends 
ProtonEngineTestSupport {
 
         peer.waitForScriptToComplete();
     }
+
+    @Test
+    public void testSlowFrameCoalesceDoesNotTriggerReadIdleTimeout() throws 
Exception {
+        // Frame data for: Transfer
+        //   Transfer{handle=0, deliveryId=1, deliveryTag=\x00\x01, 
messageFormat=null, settled=true, more=false,
+        //            rcvSettleMode=null, state=null, resume=false, 
aborted=false, batchable=false}
+        //   payload of size: 169
+        final byte[] completedTransfer1 = new byte[] {
+            0, 0, 0, -63, 2, 0, 0, 0, 0, 83, 20, -64, 11, 5, 82, 0, 82, 1, 
-96, 2, 0, 1, 64, 65, 0, 83, 115,
+            -48, 0, 0, 0, 28, 0, 0, 0, 3, -104, -107, -75, 19, 123, 103, 50, 
77, 43, -73, 93, 29, 105, 64};
+        final byte[] completedTransfer2 = new byte[] {
+            -84, 45, 110, 64, -95, 4, 116, 101, 115, 116, 0, 83, 116, -63, 23, 
2, -95, 9, 116, 105, 109, 101,
+            115, 116, 97, 109, 112, -95, 9, 49, 50, 51, 52, 53, 54, 55, 56, 
57, 0, 83, 117, -96, 100, 65, 65};
+        final byte[] completedTransfer3 = new byte[] {
+            65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 
65, 65, 65, 65, 65, 65, 65, 65, 65,
+            65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 
65, 65, 65, 65, 65, 65, 65, 65, 65,
+            65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 
65, 65, 65, 65, 65, 65, 65, 65, 65,
+            65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 
65, 65, 65, 65, 65, 65, 65};
+
+        final Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        final ProtonTestConnector peer = createTestPeer(engine);
+
+        peer.expectAMQPHeader().respondWithAMQPHeader();
+        peer.expectOpen().withIdleTimeOut(1000).respond().withIdleTimeOut(0);
+        peer.expectBegin().respond();
+        peer.expectAttach().respond();
+        peer.expectFlow();
+
+        final Connection connection = 
engine.start().setIdleTimeout(1000).open();
+        final Session session = connection.session().open();
+        final Receiver receiver = 
session.receiver("test").open().addCredit(10);
+        final AtomicReference<IncomingDelivery> receivedDelivery = new 
AtomicReference<>();
+        final AtomicBoolean deliveryArrived = new AtomicBoolean();
+
+        receiver.deliveryReadHandler(d -> {
+            deliveryArrived.set(true);
+            receivedDelivery.set(d);
+        });
+
+        // Initial tick sets first deadline
+        assertEquals(2000, connection.tick(1000));
+
+        peer.remoteBytes().withBytes(completedTransfer1).now();
+        assertEquals(2500, connection.tick(1500));
+
+        peer.remoteBytes().withBytes(completedTransfer2).now();
+        assertEquals(3000, connection.tick(2000));
+
+        peer.remoteBytes().withBytes(completedTransfer3).now();
+        assertEquals(3500, connection.tick(2500));
+
+        peer.waitForScriptToComplete();
+        peer.expectDetach().respond();
+        peer.expectEnd().respond();
+
+        assertTrue(deliveryArrived.get());
+        assertNotNull(receivedDelivery.get());
+
+        receiver.detach();
+        session.close();
+
+        peer.waitForScriptToComplete();
+
+        assertNull(failure);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to