This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 7b9cddbc PROTON-2900 Ensure idle timeout checks are scheduled on remote open 7b9cddbc is described below commit 7b9cddbc8c5cffc18c59d5427944925946157b13 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri Jul 18 11:20:31 2025 -0400 PROTON-2900 Ensure idle timeout checks are scheduled on remote open When the remote open performative arrives update the auto idle scheduled task if it exists such that a shorter remote idle timeout than the local value is honored by the write check deadline computation. --- .../protonj2/client/impl/ClientConnection.java | 3 +- .../qpid/protonj2/client/impl/ConnectionTest.java | 27 ++++++++ .../org/apache/qpid/protonj2/engine/Engine.java | 12 +++- .../protonj2/engine/impl/ProtonConnection.java | 4 +- .../qpid/protonj2/engine/impl/ProtonEngine.java | 73 ++++++++++++++++------ .../protonj2/engine/impl/ProtonEngineTest.java | 11 +--- 6 files changed, 99 insertions(+), 31 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java index f0542a06..2a3ba67b 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java @@ -620,8 +620,6 @@ public final class ClientConnection implements Connection { //----- Private implementation events handlers and utility methods private void handleLocalOpen(org.apache.qpid.protonj2.engine.Connection connection) { - connection.tickAuto(getScheduler()); - if (options.openTimeout() > 0) { executor.schedule(() -> { if (!openFuture.isDone()) { @@ -921,6 +919,7 @@ public final class ClientConnection implements Connection { .localCloseHandler(this::handleLocalClose) .openHandler(this::handleRemoteOpen) .closeHandler(this::handleRemoteClose); + protonConnection.tickAuto(getScheduler()); configureEngineSaslSupport(); } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java index e6915238..bb5afaa9 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java @@ -1865,6 +1865,33 @@ public class ConnectionTest extends ImperativeClientTestCase { } } + @Test + public void testConnectToRemoteWhoseIdleTimeoutIsShorterThanLocalSetValue() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(testServerOptions())) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().withIdleTimeOut(15_000).respond().withIdleTimeOut(1000); + peer.expectEmptyFrame(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Connect test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + ConnectionOptions options = connectionOptions().idleTimeout(30_000); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + + connection.openFuture().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose().respond(); + + connection.closeAsync().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + @Disabled("Disabled due to requirement of hard coded port") @Test public void testLocalPortOption() throws Exception { diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java index f4bda43c..a0ad5136 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Engine.java @@ -187,12 +187,16 @@ public interface Engine extends Consumer<ProtonBuffer> { * Allows the engine to manage idle timeout processing by providing it the single threaded executor * context where all transport work is done which ensures singled threaded access while removing the * need for the client library or server application to manage calls to the {@link Engine#tick} methods. + * <p> + * The API should allow for configuring auto idle timeout handling before the connection has opened and + * should react to both local and remote open performatives passing through the engine to configure read + * and write checks under the constraints of the local and remote idle timeout configurations. * * @param executor * The single threaded execution context where all engine work takes place. * * @throws IllegalStateException if the {@link Engine} is already performing auto tick handling. - * @throws EngineStateException if the Engine state precludes accepting new input. + * @throws EngineStateException if the Engine state precludes accepting new input (shutdown or failed). * * @return this {@link Engine} */ @@ -202,12 +206,16 @@ public interface Engine extends Consumer<ProtonBuffer> { * Allows the engine to manage idle timeout processing by providing it the single threaded executor * context where all transport work is done which ensures singled threaded access while removing the * need for the client library or server application to manage calls to the {@link Engine#tick} methods. + * <p> + * The API should allow for configuring auto idle timeout handling before the connection has opened and + * should react to both local and remote open performatives passing through the engine to configure read + * and write checks under the constraints of the local and remote idle timeout configurations. * * @param scheduler * The single threaded execution context where all engine work takes place. * * @throws IllegalStateException if the {@link Engine} is already performing auto tick handling. - * @throws EngineStateException if the Engine state precludes accepting new input. + * @throws EngineStateException if the Engine state precludes accepting new input (shutdown or failed). * * @return this {@link Engine} */ diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java index 185f0719..e22cee6a 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java @@ -94,7 +94,7 @@ public class ProtonConnection extends ProtonEndpoint<Connection> implements Conn * Create a new unbound Connection instance. * * @param engine - * Parent engine that created and owns this {@link Connection} insatnce. + * Parent engine that created and owns this {@link Connection} instance. */ ProtonConnection(ProtonEngine engine) { super(engine); @@ -128,6 +128,7 @@ public class ProtonConnection extends ProtonEndpoint<Connection> implements Conn syncLocalStateWithRemote(); } finally { fireLocalOpen(); + engine.handleLocalOpen(this); } } @@ -446,6 +447,7 @@ public class ProtonConnection extends ProtonEndpoint<Connection> implements Conn remoteOpen = open; fireRemoteOpen(); + engine.handleRemoteOpen(this); } @Override diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java index b7904e14..a4c8a93e 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonEngine.java @@ -25,6 +25,7 @@ import java.util.function.BiConsumer; import org.apache.qpid.protonj2.buffer.ProtonBuffer; import org.apache.qpid.protonj2.engine.AMQPPerformativeEnvelopePool; +import org.apache.qpid.protonj2.engine.Connection; import org.apache.qpid.protonj2.engine.ConnectionState; import org.apache.qpid.protonj2.engine.Engine; import org.apache.qpid.protonj2.engine.EnginePipeline; @@ -232,20 +233,16 @@ public class ProtonEngine implements Engine { Objects.requireNonNull(executor); - if (connection.getState() != ConnectionState.ACTIVE) { - throw new IllegalStateException("Cannot tick on a Connection that is not opened."); + if (connection.getState() == ConnectionState.CLOSED) { + throw new IllegalStateException("Cannot tick on a Connection that is closed."); } if (idleTimeoutExecutor != null) { throw new IllegalStateException("Automatic ticking previously initiated."); } - // TODO - As an additional feature of this method we could allow for calling before connection is - // opened such that it starts ticking either on open local and also checks as a response to - // remote open which seems might be needed anyway, see notes in IdleTimeoutCheck class. - // Immediate run of the idle timeout check logic will decide afterwards when / if we should - // reschedule the idle timeout processing. + // reschedule the idle timeout processing based on the local and remote state of the connection. LOG.trace("Auto Idle Timeout Check being initiated"); idleTimeoutExecutor = executor; idleTimeoutExecutor.execute(new IdleTimeoutCheck()); @@ -380,6 +377,48 @@ public class ProtonEngine implements Engine { //----- Internal proton engine implementation + /** + * Called from the {@link Connection} that is linked to this engine when the {@link Connection#open()} + * method is called and the connection has configured and updated its state accordingly. + * + * @param connection + * The connection associated with this engine instance. + */ + void handleLocalOpen(ProtonConnection connection) { + // When locally opened run the idle timeout check once after canceling any + // currently scheduled instance to prevent any stacking of checks. This will + // update the schedule to ensure local side idle timeouts get applied on time. + if (idleTimeoutExecutor != null) { + if (nextIdleTimeoutCheck != null) { + nextIdleTimeoutCheck.cancel(false); + nextIdleTimeoutCheck = null; + } + + idleTimeoutExecutor.execute(new IdleTimeoutCheck()); + } + } + + /** + * Called from the {@link Connection} that is linked to this engine after is has received a remote + * Close performative and the connection has configured and updated its state accordingly. + * + * @param connection + * The connection associated with this engine instance. + */ + void handleRemoteOpen(ProtonConnection connection) { + // When remotely opened run the idle timeout check once after canceling any + // currently scheduled instance to prevent any stacking of checks. This will + // update the schedule to ensure remote side idle timeouts get applied on time. + if (idleTimeoutExecutor != null) { + if (nextIdleTimeoutCheck != null) { + nextIdleTimeoutCheck.cancel(false); + nextIdleTimeoutCheck = null; + } + + idleTimeoutExecutor.execute(new IdleTimeoutCheck()); + } + } + ProtonEngine fireWrite(HeaderEnvelope frame) { pipeline.fireWrite(frame); return this; @@ -507,15 +546,21 @@ public class ProtonEngine implements Engine { private final class IdleTimeoutCheck implements Runnable { - // TODO - Pick reasonable values private final long MIN_IDLE_CHECK_INTERVAL = 1000; private final long MAX_IDLE_CHECK_INTERVAL = 10000; @Override public void run() { boolean checkScheduled = false; + boolean locallyOpen = connection.getState() == ConnectionState.ACTIVE; - if (connection.getState() == ConnectionState.ACTIVE && !isShutdown()) { + // Ensure that any check currently scheduled is canceled and cleared to prevent stacking. + if (nextIdleTimeoutCheck != null) { + nextIdleTimeoutCheck.cancel(false); + nextIdleTimeoutCheck = null; + } + + if (locallyOpen && !isShutdown()) { // Using nano time since it is not related to the wall clock, which may change long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -527,13 +572,11 @@ public class ProtonEngine implements Engine { // Check methods will close down the engine and fire error so we need to check that engine // state is active and engine is not shutdown before scheduling again. - if (deadline != 0 && connection.getState() == ConnectionState.ACTIVE && state() == EngineState.STARTED) { + if (deadline != 0 && locallyOpen && state() == EngineState.STARTED) { // Run the next idle check at half the deadline to try and ensure we meet our // obligation of sending our heart beat on time. long delay = (deadline - now) / 2; - // TODO - Some computation to work out a reasonable delay that still compensates for - // errors in scheduling while preventing over eagerness. delay = Math.max(MIN_IDLE_CHECK_INTERVAL, delay); delay = Math.min(MAX_IDLE_CHECK_INTERVAL, delay); @@ -541,18 +584,12 @@ public class ProtonEngine implements Engine { LOG.trace("IdleTimeoutCheck rescheduling with delay: {}", delay); nextIdleTimeoutCheck = idleTimeoutExecutor.schedule(this, delay, TimeUnit.MILLISECONDS); } - - // TODO - If no local timeout but remote hasn't opened we might return zero and not - // schedule any ticking ? Possible solution is to schedule after remote open - // arrives if nothing set to run and remote indicates it has an idle timeout. - } catch (Throwable t) { LOG.trace("Auto Idle Timeout Check encountered error during check: ", t); } } if (!checkScheduled) { - nextIdleTimeoutCheck = null; LOG.trace("Auto Idle Timeout Check task exiting and will not be rescheduled"); } } diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java index 21a465c5..c31f65e1 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonEngineTest.java @@ -367,13 +367,8 @@ public class ProtonEngineTest extends ProtonEngineTestSupport { } @Test - public void testAutoTickFailsWhenConnectionNotOpenedNoLocalIdleSet() throws EngineStateException { - doTestAutoTickFailsBasedOnState(false, false, false, false); - } - - @Test - public void testAutoTickFailsWhenConnectionNotOpenedLocalIdleSet() throws EngineStateException { - doTestAutoTickFailsBasedOnState(true, false, false, false); + public void testAutoTickFailsWhenConnectionNotClosedButEngineShutdownNoLocalIdleSet() throws EngineStateException { + doTestAutoTickFailsBasedOnState(false, true, false, true); } @Test @@ -418,7 +413,7 @@ public class ProtonEngineTest extends ProtonEngineTestSupport { try { engine.tickAuto(Mockito.mock(ScheduledExecutorService.class)); - fail("Should not be able to tick an unopened connection"); + fail("Should not be able to tick an closed connection or shutdown engine"); } catch (IllegalStateException | EngineShutdownException error) { } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org