This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-6.0.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-6.0.x by this push:
new b2a808466 AMQ-9432 Disable default Jetty WebSocket idle timeout
b2a808466 is described below
commit b2a808466d6ec4412c9a6d6162ccf29fd4a0225f
Author: Albertas Vyšniauskas <[email protected]>
AuthorDate: Fri Feb 16 13:01:28 2024 +0200
AMQ-9432 Disable default Jetty WebSocket idle timeout
(cherry picked from commit 2ddc3c07466feb087a4d25958c0f45116eb8d0ca)
---
.../activemq/transport/ws/StompWSConnection.java | 2 +
.../activemq/transport/ws/WSTransportProxy.java | 2 +
.../activemq/transport/ws/jetty11/MQTTSocket.java | 2 +
.../activemq/transport/ws/jetty11/StompSocket.java | 2 +
.../activemq/transport/ws/MQTTWSConnection.java | 2 +
.../activemq/transport/ws/MQTTWSTransportTest.java | 55 +++++++++++++++++++++-
.../transport/ws/StompWSTransportTest.java | 22 +++++++++
7 files changed, 86 insertions(+), 1 deletion(-)
diff --git
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
index 3d04847b3..8c0fccb1c 100644
---
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
+++
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.ws;
import java.io.IOException;
+import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
@@ -143,6 +144,7 @@ public class StompWSConnection extends WebSocketAdapter
implements WebSocketList
@Override
public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session
session) {
this.connection = session;
+ this.connection.setIdleTimeout(Duration.ZERO);
this.connectLatch.countDown();
}
diff --git
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
index b23339436..a8242d4ed 100644
---
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
+++
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.cert.X509Certificate;
+import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -218,6 +219,7 @@ public final class WSTransportProxy extends
TransportSupport implements Transpor
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
+ this.session.setIdleTimeout(Duration.ZERO);
if (wsTransport.getMaxFrameSize() > 0) {
this.session.getPolicy().setMaxBinaryMessageSize(wsTransport.getMaxFrameSize());
diff --git
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java
index cbfd43113..853c201c3 100644
---
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java
+++
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/MQTTSocket.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws.jetty11;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -116,6 +117,7 @@ public class MQTTSocket extends AbstractMQTTSocket
implements MQTTCodec.MQTTFram
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
+ this.session.setIdleTimeout(Duration.ZERO);
}
@Override
diff --git
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java
index fb13bd256..5c718c8ab 100644
---
a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java
+++
b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty11/StompSocket.java
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.ws.jetty11;
import java.io.IOException;
+import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.stomp.Stomp;
@@ -86,6 +87,7 @@ public class StompSocket extends AbstractStompSocket
implements WebSocketListene
@Override
public void onWebSocketConnect(Session session) {
this.session = session;
+ this.session.setIdleTimeout(Duration.ZERO);
}
@Override
diff --git
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
index 73cc0e6a4..e127d077d 100644
---
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
+++
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSConnection.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.ws;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -282,6 +283,7 @@ public class MQTTWSConnection extends WebSocketAdapter
implements WebSocketListe
@Override
public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session
session) {
this.connection = session;
+ this.connection.setIdleTimeout(Duration.ZERO);
this.connectLatch.countDown();
}
}
diff --git
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java
index 9dc04a61a..671f77f64 100644
---
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java
+++
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/MQTTWSTransportTest.java
@@ -229,7 +229,7 @@ public class MQTTWSTransportTest extends
WSTransportTestSupport {
TimeUnit.SECONDS.sleep(10);
- assertTrue("Connection should still open", Wait.waitFor(new
Wait.Condition() {
+ assertTrue("Connection should still be open", Wait.waitFor(new
Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
@@ -258,4 +258,57 @@ public class MQTTWSTransportTest extends
WSTransportTestSupport {
}
}));
}
+
+ @Test(timeout = 60000)
+ public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception {
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ CONNECT command = new CONNECT();
+
+ command.clientId(new UTF8Buffer(UUID.randomUUID().toString()));
+ command.cleanSession(false);
+ command.version(3);
+ command.keepAlive((short) 60);
+
+ wsMQTTConnection.sendFrame(command.encode());
+
+ MQTTFrame received = wsMQTTConnection.receive(15, TimeUnit.SECONDS);
+ if (received == null || received.messageType() != CONNACK.TYPE) {
+ fail("Client did not get expected CONNACK");
+ }
+
+ assertTrue("Connection should open", Wait.waitFor(new Wait.Condition()
{
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 1;
+ }
+ }));
+
+ TimeUnit.SECONDS.sleep(35);
+
+ assertTrue("Connection should still be open",
getProxyToBroker().getCurrentConnectionsCount() == 1);
+
+ wsMQTTConnection.disconnect();
+ wsMQTTConnection.close();
+
+ done.set(true);
+
+ assertTrue("Connection should close", Wait.waitFor(new
Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 0;
+ }
+ }));
+
+ assertTrue("Client Connection should close", Wait.waitFor(new
Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !wsMQTTConnection.isConnected();
+ }
+ }));
+ }
}
diff --git
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
index cd17fc9d4..40e3290ac 100644
---
a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
+++
b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
@@ -341,4 +341,26 @@ public class StompWSTransportTest extends
WSTransportTestSupport {
LOG.info("Caught exception on write of disconnect", ex);
}
}
+
+ @Test(timeout = 60000)
+ public void testNoDefaultJettyWebSocketIdleTimeout() throws Exception {
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:60000,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ wsStompConnection.sendRawFrame(connectFrame);
+ String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+ assertTrue(incoming.startsWith("CONNECTED"));
+ assertTrue(incoming.indexOf("version:1.1") >= 0);
+ assertTrue(incoming.indexOf("heart-beat:") >= 0);
+ assertTrue(incoming.indexOf("session:") >= 0);
+
+ TimeUnit.SECONDS.sleep(35);
+
+ wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
+ }
}