This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-6.0.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-6.0.x by this push:
     new b2a808466 AMQ-9432 Disable default Jetty WebSocket idle timeout
b2a808466 is described below

commit b2a808466d6ec4412c9a6d6162ccf29fd4a0225f
Author: Albertas Vyšniauskas <[email protected]>
AuthorDate: Fri Feb 16 13:01:28 2024 +0200

    AMQ-9432 Disable default Jetty WebSocket idle timeout
    
    (cherry picked from commit 2ddc3c07466feb087a4d25958c0f45116eb8d0ca)
---
 .../activemq/transport/ws/StompWSConnection.java   |  2 +
 .../activemq/transport/ws/WSTransportProxy.java    |  2 +
 .../activemq/transport/ws/jetty11/MQTTSocket.java  |  2 +
 .../activemq/transport/ws/jetty11/StompSocket.java |  2 +
 .../activemq/transport/ws/MQTTWSConnection.java    |  2 +
 .../activemq/transport/ws/MQTTWSTransportTest.java | 55 +++++++++++++++++++++-
 .../transport/ws/StompWSTransportTest.java         | 22 +++++++++
 7 files changed, 86 insertions(+), 1 deletion(-)

diff --git 
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
 
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
index 3d04847b3..8c0fccb1c 100644
--- 
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
+++ 
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.ws;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -143,6 +144,7 @@ public class StompWSConnection extends WebSocketAdapter 
implements WebSocketList
     @Override
     public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session 
session) {
         this.connection = session;
+        this.connection.setIdleTimeout(Duration.ZERO);
         this.connectLatch.countDown();
     }
 
diff --git 
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
 
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
index b23339436..a8242d4ed 100644
--- 
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
+++ 
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.cert.X509Certificate;
+import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -218,6 +219,7 @@ public final class WSTransportProxy extends 
TransportSupport implements Transpor
     @Override
     public void onWebSocketConnect(Session session) {
         this.session = session;
+        this.session.setIdleTimeout(Duration.ZERO);
 
         if (wsTransport.getMaxFrameSize() > 0) {
             
this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize());
diff --git 
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java
 
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java
index cbfd43113..853c201c3 100644
--- 
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java
+++ 
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty11;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -116,6 +117,7 @@ public class MQTTSocket extends AbstractMQTTSocket 
implements MQTTCodec.MQTTFram
     @Override
     public void onWebSocketConnect(Session session) {
         this.session = session;
+        this.session.setIdleTimeout(Duration.ZERO);
     }
 
     @Override
diff --git 
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java
 
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java
index fb13bd256..5c718c8ab 100644
--- 
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java
+++ 
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.ws.jetty11;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.transport.stomp.Stomp;
@@ -86,6 +87,7 @@ public class StompSocket extends AbstractStompSocket 
implements WebSocketListene
     @Override
     public void onWebSocketConnect(Session session) {
         this.session = session;
+        this.session.setIdleTimeout(Duration.ZERO);
     }
 
     @Override
diff --git 
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
 
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
index 73cc0e6a4..e127d077d 100644
--- 
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
+++ 
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -282,6 +283,7 @@ public class MQTTWSConnection extends WebSocketAdapter 
implements WebSocketListe
     @Override
     public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session 
session) {
         this.connection = session;
+        this.connection.setIdleTimeout(Duration.ZERO);
         this.connectLatch.countDown();
     }
 }
diff --git 
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java
 
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java
index 9dc04a61a..671f77f64 100644
--- 
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java
+++ 
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java
@@ -229,7 +229,7 @@ public class MQTTWSTransportTest extends 
WSTransportTestSupport {
 
         TimeUnit.SECONDS.sleep(10);
 
-        assertTrue("Connection should still open", Wait.waitFor(new 
Wait.Condition() {
+        assertTrue("Connection should still be open", Wait.waitFor(new 
Wait.Condition() {
 
             @Override
             public boolean isSatisified() throws Exception {
@@ -258,4 +258,57 @@ public class MQTTWSTransportTest extends 
WSTransportTestSupport {
             }
         }));
     }
+
+    @Test(timeout = 60000)
+    public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception {
+
+        final AtomicBoolean done = new AtomicBoolean();
+
+        CONNECT command = new CONNECT();
+
+        command.clientId(new UTF8Buffer(UUID.randomUUID().toString()));
+        command.cleanSession(false);
+        command.version(3);
+        command.keepAlive((short) 60);
+
+        wsMQTTConnection.sendFrame(command.encode());
+
+        MQTTFrame received = wsMQTTConnection.receive(15, TimeUnit.SECONDS);
+        if (received == null || received.messageType() != CONNACK.TYPE) {
+            fail("Client did not get expected CONNACK");
+        }
+
+        assertTrue("Connection should open", Wait.waitFor(new Wait.Condition() 
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 1;
+            }
+        }));
+
+        TimeUnit.SECONDS.sleep(35);
+
+        assertTrue("Connection should still be open", 
getProxyToBroker().getCurrentConnectionsCount() == 1);
+
+        wsMQTTConnection.disconnect();
+        wsMQTTConnection.close();
+
+        done.set(true);
+
+        assertTrue("Connection should close", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 0;
+            }
+        }));
+
+        assertTrue("Client Connection should close", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !wsMQTTConnection.isConnected();
+            }
+        }));
+    }
 }
diff --git 
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
 
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
index cd17fc9d4..40e3290ac 100644
--- 
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
+++ 
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
@@ -341,4 +341,26 @@ public class StompWSTransportTest extends 
WSTransportTestSupport {
             LOG.info("Caught exception on write of disconnect", ex);
         }
     }
+
+    @Test(timeout = 60000)
+    public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception {
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:60000,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        wsStompConnection.sendRawFrame(connectFrame);
+        String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+        assertTrue(incoming.startsWith("CONNECTED"));
+        assertTrue(incoming.indexOf("version:1.1") >= 0);
+        assertTrue(incoming.indexOf("heart-beat:") >= 0);
+        assertTrue(incoming.indexOf("session:") >= 0);
+
+        TimeUnit.SECONDS.sleep(35);
+
+        wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
+    }
 }

Reply via email to