Repository: activemq Updated Branches: refs/heads/activemq-5.14.x fd3853c24 -> fdf1537eb
https://issues.apache.org/jira/browse/AMQ-6482 Adding a timeout for websocket sends to prevent the transport thread from getting stuck and blocking. The default is 30 seconds. (cherry picked from commit 450cabe4ead1fb78eec2e94013d2868a5bf864da) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fdf1537e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fdf1537e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fdf1537e Branch: refs/heads/activemq-5.14.x Commit: fdf1537eb8c3bf08668d452f2c816cb28acd8f00 Parents: fd3853c Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Thu Nov 3 08:41:37 2016 -0400 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Fri Nov 4 12:59:04 2016 -0400 ---------------------------------------------------------------------- .../apache/activemq/transport/ws/jetty9/MQTTSocket.java | 12 +++++++++++- .../activemq/transport/ws/jetty9/StompSocket.java | 12 +++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/fdf1537e/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java index d8c248d..2b4be15 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/MQTTSocket.java @@ -46,7 +46,13 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener @Override public void sendToMQTT(MQTTFrame command) throws IOException { ByteSequence bytes = wireFormat.marshal(command); - session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength())); + try { + //timeout after a period of time so we don't wait forever and hold the protocol lock + session.getRemote().sendBytesByFuture( + ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength())).get(getDefaultSendTimeOut(), TimeUnit.SECONDS); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } } @Override @@ -117,4 +123,8 @@ public class MQTTSocket extends AbstractMQTTSocket implements WebSocketListener @Override public void onWebSocketText(String arg0) { } + + private static int getDefaultSendTimeOut() { + return Integer.getInteger("org.apache.activemq.transport.ws.MQTTSocket.sendTimeout", 30); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/fdf1537e/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java index ee012db..72efef7 100644 --- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.transport.stomp.Stomp; import org.apache.activemq.transport.stomp.StompFrame; import org.apache.activemq.transport.ws.AbstractStompSocket; +import org.apache.activemq.util.IOExceptionSupport; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketListener; import org.slf4j.Logger; @@ -44,7 +45,12 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene @Override public void sendToStomp(StompFrame command) throws IOException { - session.getRemote().sendString(command.format()); + try { + //timeout after a period of time so we don't wait forever and hold the protocol lock + session.getRemote().sendStringByFuture(command.format()).get(getDefaultSendTimeOut(), TimeUnit.SECONDS); + } catch (Exception e) { + throw IOExceptionSupport.create(e); + } } @Override @@ -89,4 +95,8 @@ public class StompSocket extends AbstractStompSocket implements WebSocketListene public void onWebSocketText(String data) { processStompFrame(data); } + + private static int getDefaultSendTimeOut() { + return Integer.getInteger("org.apache.activemq.transport.ws.StompSocket.sendTimeout", 30); + } }
