This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b9cddbc PROTON-2900 Ensure idle timeout checks are scheduled on 
remote open
7b9cddbc is described below

commit 7b9cddbc8c5cffc18c59d5427944925946157b13
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri Jul 18 11:20:31 2025 -0400

    PROTON-2900 Ensure idle timeout checks are scheduled on remote open
    
    When the remote open performative arrives update the auto idle scheduled 
task
    if it exists such that a shorter remote idle timeout than the local value is
    honored by the write check deadline computation.
---
 .../protonj2/client/impl/ClientConnection.java     |  3 +-
 .../qpid/protonj2/client/impl/ConnectionTest.java  | 27 ++++++++
 .../org/apache/qpid/protonj2/engine/Engine.java    | 12 +++-
 .../protonj2/engine/impl/ProtonConnection.java     |  4 +-
 .../qpid/protonj2/engine/impl/ProtonEngine.java    | 73 ++++++++++++++++------
 .../protonj2/engine/impl/ProtonEngineTest.java     | 11 +---
 6 files changed, 99 insertions(+), 31 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index f0542a06..2a3ba67b 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -620,8 +620,6 @@ public final class ClientConnection implements Connection {
     //----- Private implementation events handlers and utility methods
 
     private void handleLocalOpen(org.apache.qpid.protonj2.engine.Connection 
connection) {
-        connection.tickAuto(getScheduler());
-
         if (options.openTimeout() > 0) {
             executor.schedule(() -> {
                 if (!openFuture.isDone()) {
@@ -921,6 +919,7 @@ public final class ClientConnection implements Connection {
                         .localCloseHandler(this::handleLocalClose)
                         .openHandler(this::handleRemoteOpen)
                         .closeHandler(this::handleRemoteClose);
+        protonConnection.tickAuto(getScheduler());
 
         configureEngineSaslSupport();
     }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
index e6915238..bb5afaa9 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
@@ -1865,6 +1865,33 @@ public class ConnectionTest extends 
ImperativeClientTestCase {
         }
     }
 
+    @Test
+    public void 
testConnectToRemoteWhoseIdleTimeoutIsShorterThanLocalSetValue() throws 
Exception {
+        try (ProtonTestServer peer = new 
ProtonTestServer(testServerOptions())) {
+            peer.expectSASLAnonymousConnect();
+            
peer.expectOpen().withIdleTimeOut(15_000).respond().withIdleTimeOut(1000);
+            peer.expectEmptyFrame();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Connect test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = 
connectionOptions().idleTimeout(30_000);
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), options);
+
+            connection.openFuture().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectClose().respond();
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     @Disabled("Disabled due to requirement of hard coded port")
     @Test
     public void testLocalPortOption() throws Exception {
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java
index f4bda43c..a0ad5136 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java
@@ -187,12 +187,16 @@ public interface Engine extends Consumer<ProtonBuffer> {
      * Allows the engine to manage idle timeout processing by providing it the 
single threaded executor
      * context where all transport work is done which ensures singled threaded 
access while removing the
      * need for the client library or server application to manage calls to 
the {@link Engine#tick} methods.
+     * <p>
+     * The API should allow for configuring auto idle timeout handling before 
the connection has opened and
+     * should react to both local and remote open performatives passing 
through the engine to configure read
+     * and write checks under the constraints of the local and remote idle 
timeout configurations.
      *
      * @param executor
      *      The single threaded execution context where all engine work takes 
place.
      *
      * @throws IllegalStateException if the {@link Engine} is already 
performing auto tick handling.
-     * @throws EngineStateException if the Engine state precludes accepting 
new input.
+     * @throws EngineStateException if the Engine state precludes accepting 
new input (shutdown or failed).
      *
      * @return this {@link Engine}
      */
@@ -202,12 +206,16 @@ public interface Engine extends Consumer<ProtonBuffer> {
      * Allows the engine to manage idle timeout processing by providing it the 
single threaded executor
      * context where all transport work is done which ensures singled threaded 
access while removing the
      * need for the client library or server application to manage calls to 
the {@link Engine#tick} methods.
+     * <p>
+     * The API should allow for configuring auto idle timeout handling before 
the connection has opened and
+     * should react to both local and remote open performatives passing 
through the engine to configure read
+     * and write checks under the constraints of the local and remote idle 
timeout configurations.
      *
      * @param scheduler
      *      The single threaded execution context where all engine work takes 
place.
      *
      * @throws IllegalStateException if the {@link Engine} is already 
performing auto tick handling.
-     * @throws EngineStateException if the Engine state precludes accepting 
new input.
+     * @throws EngineStateException if the Engine state precludes accepting 
new input (shutdown or failed).
      *
      * @return this {@link Engine}
      */
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
index 185f0719..e22cee6a 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
@@ -94,7 +94,7 @@ public class ProtonConnection extends 
ProtonEndpoint<Connection> implements Conn
      * Create a new unbound Connection instance.
      *
      * @param engine
-     *                 Parent engine that created and owns this {@link 
Connection} insatnce.
+     *                 Parent engine that created and owns this {@link 
Connection} instance.
      */
     ProtonConnection(ProtonEngine engine) {
         super(engine);
@@ -128,6 +128,7 @@ public class ProtonConnection extends 
ProtonEndpoint<Connection> implements Conn
                 syncLocalStateWithRemote();
             } finally {
                 fireLocalOpen();
+                engine.handleLocalOpen(this);
             }
         }
 
@@ -446,6 +447,7 @@ public class ProtonConnection extends 
ProtonEndpoint<Connection> implements Conn
         remoteOpen = open;
 
         fireRemoteOpen();
+        engine.handleRemoteOpen(this);
     }
 
     @Override
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
index b7904e14..a4c8a93e 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java
@@ -25,6 +25,7 @@ import java.util.function.BiConsumer;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.engine.AMQPPerformativeEnvelopePool;
+import org.apache.qpid.protonj2.engine.Connection;
 import org.apache.qpid.protonj2.engine.ConnectionState;
 import org.apache.qpid.protonj2.engine.Engine;
 import org.apache.qpid.protonj2.engine.EnginePipeline;
@@ -232,20 +233,16 @@ public class ProtonEngine implements Engine {
 
         Objects.requireNonNull(executor);
 
-        if (connection.getState() != ConnectionState.ACTIVE) {
-            throw new IllegalStateException("Cannot tick on a Connection that 
is not opened.");
+        if (connection.getState() == ConnectionState.CLOSED) {
+            throw new IllegalStateException("Cannot tick on a Connection that 
is closed.");
         }
 
         if (idleTimeoutExecutor != null) {
             throw new IllegalStateException("Automatic ticking previously 
initiated.");
         }
 
-        // TODO - As an additional feature of this method we could allow for 
calling before connection is
-        //        opened such that it starts ticking either on open local and 
also checks as a response to
-        //        remote open which seems might be needed anyway, see notes in 
IdleTimeoutCheck class.
-
         // Immediate run of the idle timeout check logic will decide 
afterwards when / if we should
-        // reschedule the idle timeout processing.
+        // reschedule the idle timeout processing based on the local and 
remote state of the connection.
         LOG.trace("Auto Idle Timeout Check being initiated");
         idleTimeoutExecutor = executor;
         idleTimeoutExecutor.execute(new IdleTimeoutCheck());
@@ -380,6 +377,48 @@ public class ProtonEngine implements Engine {
 
     //----- Internal proton engine implementation
 
+    /**
+     * Called from the {@link Connection} that is linked to this engine when 
the {@link Connection#open()}
+     * method is called and the connection has configured and updated its 
state accordingly.
+     *
+     * @param connection
+     *                 The connection associated with this engine instance.
+     */
+    void handleLocalOpen(ProtonConnection connection) {
+        // When locally opened run the idle timeout check once after canceling 
any
+        // currently scheduled instance to prevent any stacking of checks. 
This will
+        // update the schedule to ensure local side idle timeouts get applied 
on time.
+        if (idleTimeoutExecutor != null) {
+            if (nextIdleTimeoutCheck != null) {
+                nextIdleTimeoutCheck.cancel(false);
+                nextIdleTimeoutCheck = null;
+            }
+
+            idleTimeoutExecutor.execute(new IdleTimeoutCheck());
+        }
+    }
+
+    /**
+     * Called from the {@link Connection} that is linked to this engine after 
is has received a remote
+     * Close performative and the connection has configured and updated its 
state accordingly.
+     *
+     * @param connection
+     *                 The connection associated with this engine instance.
+     */
+    void handleRemoteOpen(ProtonConnection connection) {
+        // When remotely opened run the idle timeout check once after 
canceling any
+        // currently scheduled instance to prevent any stacking of checks. 
This will
+        // update the schedule to ensure remote side idle timeouts get applied 
on time.
+        if (idleTimeoutExecutor != null) {
+            if (nextIdleTimeoutCheck != null) {
+                nextIdleTimeoutCheck.cancel(false);
+                nextIdleTimeoutCheck = null;
+            }
+
+            idleTimeoutExecutor.execute(new IdleTimeoutCheck());
+        }
+    }
+
     ProtonEngine fireWrite(HeaderEnvelope frame) {
         pipeline.fireWrite(frame);
         return this;
@@ -507,15 +546,21 @@ public class ProtonEngine implements Engine {
 
     private final class IdleTimeoutCheck implements Runnable {
 
-        // TODO - Pick reasonable values
         private final long MIN_IDLE_CHECK_INTERVAL = 1000;
         private final long MAX_IDLE_CHECK_INTERVAL = 10000;
 
         @Override
         public void run() {
             boolean checkScheduled = false;
+            boolean locallyOpen = connection.getState() == 
ConnectionState.ACTIVE;
 
-            if (connection.getState() == ConnectionState.ACTIVE && 
!isShutdown()) {
+            // Ensure that any check currently scheduled is canceled and 
cleared to prevent stacking.
+            if (nextIdleTimeoutCheck != null) {
+                nextIdleTimeoutCheck.cancel(false);
+                nextIdleTimeoutCheck = null;
+            }
+
+            if (locallyOpen && !isShutdown()) {
                 // Using nano time since it is not related to the wall clock, 
which may change
                 long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
 
@@ -527,13 +572,11 @@ public class ProtonEngine implements Engine {
 
                     // Check methods will close down the engine and fire error 
so we need to check that engine
                     // state is active and engine is not shutdown before 
scheduling again.
-                    if (deadline != 0 && connection.getState() == 
ConnectionState.ACTIVE && state() == EngineState.STARTED) {
+                    if (deadline != 0 && locallyOpen && state() == 
EngineState.STARTED) {
                         // Run the next idle check at half the deadline to try 
and ensure we meet our
                         // obligation of sending our heart beat on time.
                         long delay = (deadline - now) / 2;
 
-                        // TODO - Some computation to work out a reasonable 
delay that still compensates for
-                        //        errors in scheduling while preventing over 
eagerness.
                         delay = Math.max(MIN_IDLE_CHECK_INTERVAL, delay);
                         delay = Math.min(MAX_IDLE_CHECK_INTERVAL, delay);
 
@@ -541,18 +584,12 @@ public class ProtonEngine implements Engine {
                         LOG.trace("IdleTimeoutCheck rescheduling with delay: 
{}", delay);
                         nextIdleTimeoutCheck = 
idleTimeoutExecutor.schedule(this, delay, TimeUnit.MILLISECONDS);
                     }
-
-                    // TODO - If no local timeout but remote hasn't opened we 
might return zero and not
-                    //        schedule any ticking ?  Possible solution is to 
schedule after remote open
-                    //        arrives if nothing set to run and remote 
indicates it has an idle timeout.
-
                 } catch (Throwable t) {
                     LOG.trace("Auto Idle Timeout Check encountered error 
during check: ", t);
                 }
             }
 
             if (!checkScheduled) {
-                nextIdleTimeoutCheck = null;
                 LOG.trace("Auto Idle Timeout Check task exiting and will not 
be rescheduled");
             }
         }
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
index 21a465c5..c31f65e1 100644
--- 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java
@@ -367,13 +367,8 @@ public class ProtonEngineTest extends 
ProtonEngineTestSupport {
     }
 
     @Test
-    public void testAutoTickFailsWhenConnectionNotOpenedNoLocalIdleSet() 
throws EngineStateException {
-        doTestAutoTickFailsBasedOnState(false, false, false, false);
-    }
-
-    @Test
-    public void testAutoTickFailsWhenConnectionNotOpenedLocalIdleSet() throws 
EngineStateException {
-        doTestAutoTickFailsBasedOnState(true, false, false, false);
+    public void 
testAutoTickFailsWhenConnectionNotClosedButEngineShutdownNoLocalIdleSet() 
throws EngineStateException {
+        doTestAutoTickFailsBasedOnState(false, true, false, true);
     }
 
     @Test
@@ -418,7 +413,7 @@ public class ProtonEngineTest extends 
ProtonEngineTestSupport {
 
         try {
             engine.tickAuto(Mockito.mock(ScheduledExecutorService.class));
-            fail("Should not be able to tick an unopened connection");
+            fail("Should not be able to tick an closed connection or shutdown 
engine");
         } catch (IllegalStateException | EngineShutdownException error) {
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to