This is an automated email from the ASF dual-hosted git repository. reiern70 pushed a commit to branch improvement/reiern70/WICKET-6954-ping-pong-keep-alive in repository https://gitbox.apache.org/repos/asf/wicket.git
commit 96e0e0170f8fb2881fbd11c39dcaf45d5eaead31 Author: reiern70 <[email protected]> AuthorDate: Sat Feb 5 10:26:23 2022 -0500 [WICKET-6954] initial work --- .../wicket/protocol/ws/WebSocketSettings.java | 229 +++++++++++++++++++++ .../ws/api/AbstractWebSocketProcessor.java | 10 + .../protocol/ws/api/BaseWebSocketBehavior.java | 28 +++ .../protocol/ws/api/IWebSocketConnection.java | 48 +++++ .../protocol/ws/api/IWebSocketProcessor.java | 5 + .../ws/api/res/js/wicket-websocket-jquery.js | 25 +++ .../ws/api/res/js/wicket-websocket-setup.js.tmpl | 5 +- .../ws/javax/JavaxWebSocketConnection.java | 56 +++++ .../protocol/ws/javax/JavaxWebSocketProcessor.java | 16 ++ 9 files changed, 421 insertions(+), 1 deletion(-) diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java index 7aca394..dd59f49 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java @@ -41,6 +41,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import jakarta.servlet.http.HttpServletRequest; + +import java.io.IOException; +import java.time.Duration; +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -140,6 +146,41 @@ public class WebSocketSettings private IWebSocketConnectionFilter connectionFilter; /** + * Boolean used to determine if ping-pong heart beat will be used. + */ + private boolean useHeartBeat = false; + + /** + * Flag used to determine if client will try to reconnect in case of ping-pong failure + */ + private boolean reconnectOnFailure = false; + + /** + * In case ping or remote client immediately fails, this determines how many times ping + * will be retried before connection it terminated. + */ + private int maxPingRetries = 3; + + /** + * Periodicity by which the heartbeat timer will kick. + */ + private long heartBeatPace = Duration.ofSeconds(15).toMillis(); + + /** + * The max threshold assumed for network latency. + */ + private long networkLatencyThreshold = Duration.ofSeconds(2).toMillis(); + + /** + * The executor that handles delivering pings to remote peers and checking if peers have + * correctly ponged back (and terminates connections in case not). + */ + private Executor heartBeatsExecutor = new HeartBeatsExecutor(); + + // internal heartbeats timer. + private Timer heartBeatsTimer; + + /** * A {@link org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer} that allows to configure * {@link org.apache.wicket.protocol.ws.api.IWebSocketSession}s. */ @@ -424,6 +465,167 @@ public class WebSocketSettings return securePort.get(); } + public void setUseHeartBeat(boolean useHeartBeat) + { + this.useHeartBeat = useHeartBeat; + } + + public boolean isUseHeartBeat() + { + return useHeartBeat; + } + + public void setReconnectOnFailure(boolean reconnectOnFailure) + { + this.reconnectOnFailure = reconnectOnFailure; + } + + public boolean isReconnectOnFailure() + { + return reconnectOnFailure; + } + + public long getHeartBeatPace() + { + return heartBeatPace; + } + + public void setHeartBeatPace(long heartBeatPace) + { + this.heartBeatPace = heartBeatPace; + } + + public void setHeartBeatPace(Duration heartBeatPace) + { + this.heartBeatPace = heartBeatPace.toMillis(); + } + + public long getNetworkLatencyThreshold() + { + return networkLatencyThreshold; + } + + public void setNetworkLatencyThreshold(long networkLatencyThreshold) + { + this.networkLatencyThreshold = networkLatencyThreshold; + } + + public void setNetworkLatencyThreshold(Duration networkLatencyThreshold) + { + this.networkLatencyThreshold = networkLatencyThreshold.toMillis(); + } + + public void setHeartBeatsExecutor(Executor heartBeatsExecutor) + { + this.heartBeatsExecutor = heartBeatsExecutor; + } + + public void setMaxPingRetries(int maxPingRetries) + { + this.maxPingRetries = maxPingRetries; + } + + + public void startHeartBeatTimer(String application) + { + if (useHeartBeat == false) + { + LOG.debug("useClientSideHeartBeat is set to false. Thus we won't start heartbeat's sending thread"); + } + + if (LOG.isDebugEnabled()) + { + LOG.debug("Starting thread pushing hart beats"); + } + + TimerTask timerTask = new TimerTask() + { + @Override + public void run() + { + try + { + sendHeartBeats(application); + } + catch (Exception e) + { + LOG.error("Error while checking sessions", e); + } + } + }; + + this.heartBeatsTimer = new Timer(true); + this.heartBeatsTimer.schedule(timerTask, new Date(), Duration.ofSeconds(heartBeatPace).toMillis()); + } + + public void stopHeartBeatTimer() + { + if (LOG.isDebugEnabled()) + { + LOG.debug("Stopping thread pushing hart beats"); + } + if (this.heartBeatsTimer != null) + { + this.heartBeatsTimer.cancel(); + } + } + + private void sendHeartBeats(Application application) + { + for (IWebSocketConnection connection: connectionRegistry.getConnections(application)) + { + // connection didn't received the PONG from peer terminate it + if (connection.isAlive() == false) + { + if (connection.getLastTimeAlive() - System.currentTimeMillis() > (heartBeatPace + networkLatencyThreshold)) + { + heartBeatsExecutor.run(() -> terminateConnection(connection)); + } + } + else + { + heartBeatsExecutor.run(() -> ping(connection, maxPingRetries)); + } + } + } + + private void sendHeartBeats(String application) + { + sendHeartBeats(Application.get(application)); + } + + private void ping(IWebSocketConnection connection, final int pingRetryCounter) + { + try + { + connection.ping(); + } + catch (IOException e) + { + if (pingRetryCounter == 0) + { + // ping failed enough times kill connection + terminateConnection(connection); + } + else + { + ping(connection, pingRetryCounter - 1); + } + } + } + + private void terminateConnection(IWebSocketConnection connection) + { + connection.setAlive(false); + if (LOG.isDebugEnabled()) + { + LOG.debug("Terminating connection with ID {} because ping of remote peer failed {} times", + connection.getKey(), maxPingRetries); + } + connection.terminate("Failed to ping remote peer"); + connectionRegistry.removeConnection(connection.getApplication(), connection.getSessionId(), connection.getKey()); + } + /** * Simple executor that runs the tasks in the caller thread. */ @@ -487,6 +689,33 @@ public class WebSocketSettings } } + public static class HeartBeatsExecutor implements Executor + { + + private final java.util.concurrent.Executor executor; + + + public HeartBeatsExecutor() + { + this(new ThreadPoolExecutor(1, 8, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new ThreadFactory())); + } + + public HeartBeatsExecutor(java.util.concurrent.Executor executor) + { + this.executor = executor; + + } + + @Override + public void run(final Runnable command) + { + executor.execute(command); + } + } + public static class ThreadFactory implements java.util.concurrent.ThreadFactory { private final AtomicInteger counter = new AtomicInteger(); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java index 59e49d1..0290544 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java @@ -16,6 +16,8 @@ */ package org.apache.wicket.protocol.ws.api; +import java.nio.ByteBuffer; + import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpSession; @@ -147,6 +149,14 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor this.connectionFilter = webSocketSettings.getConnectionFilter(); } + + @Override + public void onPong(ByteBuffer byteBuffer) { + IKey key = getRegistryKey(); + IWebSocketConnection webSocketConnection = connectionRegistry.getConnection(getApplication(), getSessionId(), key); + webSocketConnection.onPong(byteBuffer); + } + @Override public void onMessage(final String message) { diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java index 4e37f54..a79f8e7 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java @@ -162,6 +162,14 @@ public class BaseWebSocketBehavior extends Behavior Integer securePort = getSecurePort(webSocketSettings); variables.put("securePort", securePort); + variables.put("useHeartBeat", isUseHeartBeat(webSocketSettings)); + + variables.put("reconnectOnFailure", isReconnectOnFailure(webSocketSettings)); + + variables.put("heartBeatPace", getHeartBeatPace(webSocketSettings)); + + variables.put("networkLatencyThreshold", getNetworkLatencyThreshold(webSocketSettings)); + CharSequence contextPath = getContextPath(webSocketSettings); Args.notNull(contextPath, "contextPath"); variables.put("contextPath", contextPath); @@ -178,6 +186,26 @@ public class BaseWebSocketBehavior extends Behavior return variables; } + protected boolean isUseHeartBeat(WebSocketSettings webSocketSettings) + { + return webSocketSettings.isUseHeartBeat(); + } + + protected boolean isReconnectOnFailure(WebSocketSettings webSocketSettings) + { + return webSocketSettings.isReconnectOnFailure(); + } + + protected long getHeartBeatPace(WebSocketSettings webSocketSettings) + { + return webSocketSettings.getHeartBeatPace(); + } + + protected long getNetworkLatencyThreshold(WebSocketSettings webSocketSettings) + { + return webSocketSettings.getNetworkLatencyThreshold(); + } + protected Integer getPort(WebSocketSettings webSocketSettings) { return webSocketSettings.getPort(); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java index 82cf6e9..54f6027 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java @@ -17,6 +17,7 @@ package org.apache.wicket.protocol.ws.api; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.wicket.Application; import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; @@ -30,6 +31,53 @@ import org.apache.wicket.protocol.ws.api.registry.IKey; public interface IWebSocketConnection { /** + * + * @return the last time connection was checked to be alive (checked via ping/pong mechanism) + */ + long getLastTimeAlive(); + + /** + * + * @return {@code true} if connection is alive (checked via ping/pong mechanism). {@code false} otherwise + */ + boolean isAlive(); + + /** + * Allows setting whether connection is alive or not. + * + * @param alive Alive + */ + void setAlive(boolean alive); + + /** + * Terminates the connection. + * + * @param reason The reason to terminate connection. + */ + void terminate(String reason); + + /** + * Sends a ping message to the server. + * + * @throws IOException if something went wrong with ping + */ + void ping() throws IOException; + + + /** + * Allows the developer to send an unsolicited Pong message containing the given application data in order to serve + * as a unidirectional heartbeat for the session. + */ + void pong() throws IOException; + + /** + * Called when remote peer answers to ping with pong message. + * + * @param byteBuffer Contains application specific content + */ + void onPong(ByteBuffer byteBuffer); + + /** * @return {@code true} when the underlying native web socket * connection is still open. */ diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java index 74b34d1..5aa9391 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java @@ -16,8 +16,11 @@ */ package org.apache.wicket.protocol.ws.api; +import java.nio.ByteBuffer; + import org.apache.wicket.protocol.http.WebApplication; import org.apache.wicket.protocol.ws.WebSocketSettings; +import org.apache.wicket.protocol.ws.api.registry.IKey; /** * Processes web socket messages. @@ -41,6 +44,8 @@ public interface IWebSocketProcessor WebSocketSettings.Holder.get(application).getSocketSessionConfigurer().configureSession(webSocketSession); } + void onPong(ByteBuffer byteBuffer); + /** * Called when a text message arrives from the client * diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js index 93b7386..55086dc 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js @@ -92,8 +92,15 @@ self.ws.onopen = function (evt) { Wicket.Event.publish(topics.Opened, evt); + if (Wicket.WebSocket.useHeartBeat) { + self.heartbeat(); + } }; + if (WWS.useHeartBeat) { + self.ws.on("ping", self.heartbeat) + } + self.ws.onmessage = function (event) { var message = event.data; @@ -139,6 +146,24 @@ } }, + heartbeat: function () { + clearTimeout(this.pingTimeout); + + // Use `WebSocket#terminate()`, which immediately destroys the connection, + // instead of `WebSocket#close()`, which waits for the close timer. + // Delay should be equal to the interval at which your server + // sends out pings plus a conservative assumption of the latency. + this.pingTimeout = setTimeout(() => { + this.ws.terminate(); + // try to reconnect to server + if (Wicket.WebSocket.reconnectOnFailure) + { + Wicket.WebSocket.INSTANCE = null; + Wicket.WebSocket.createDefaultConnection(); + } + }, Wicket.WebSocket.heartBeatPace + Wicket.WebSocket.networkLatencyThreshold); + }, + send: function (text) { if (this.ws && text) { Wicket.Log.info('[WebSocket.send] Sending: ' + text); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl index 98e9bc8..8fa142a 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl @@ -4,7 +4,10 @@ if (typeof(Wicket.WebSocket.appName) === "undefined") { jQuery.extend(Wicket.WebSocket, { pageId: ${pageId}, context: '${context}', resourceName: '${resourceName}', connectionToken: '${connectionToken}', baseUrl: '${baseUrl}', contextPath: '${contextPath}', appName: '${applicationName}', - port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}' }); + port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}', + useHeartBeat: ${useHeartBeat}, reconnectOnFailure: ${reconnectOnFailure}, + heartBeatPace: ${heartBeatPace}, networkLatencyThreshold: ${networkLatencyThreshold}); + Wicket.WebSocket.createDefaultConnection(); } })(); diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java index b9f6419..4d690ec 100644 --- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java +++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java @@ -18,6 +18,8 @@ package org.apache.wicket.protocol.ws.javax; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import jakarta.websocket.CloseReason; import jakarta.websocket.Session; @@ -40,6 +42,9 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection private final Session session; + private final AtomicBoolean alive = new AtomicBoolean(false); + private final AtomicLong lastTimeAlive = new AtomicLong(System.currentTimeMillis()); + /** * Constructor. * @@ -50,6 +55,57 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection { super(webSocketProcessor); this.session = Args.notNull(session, "session"); + setAlive(true); + } + + @Override + public long getLastTimeAlive() + { + return lastTimeAlive.get(); + } + + @Override + public boolean isAlive() + { + return alive.get(); + } + + @Override + public void setAlive(boolean alive) + { + if (alive) + { + // is connection if alive we set the timestamp. + this.lastTimeAlive.set(System.currentTimeMillis()); + } + this.alive.set(alive); + } + + @Override + public synchronized void terminate(String reason) + { + close(CloseReason.CloseCodes.GOING_AWAY.getCode(), reason); + } + + @Override + public void ping() throws IOException + { + ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA}); + session.getBasicRemote().sendPing(buf); + } + + @Override + public void pong() throws IOException + { + ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA}); + session.getBasicRemote().sendPong(buf); + } + + @Override + public void onPong(ByteBuffer byteBuffer) + { + // we received pong answer from remote peer. Thus, connection is alive + setAlive(true); } @Override diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java index c393517..cd3adf8 100644 --- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java @@ -20,12 +20,15 @@ import java.nio.ByteBuffer; import jakarta.websocket.EndpointConfig; import jakarta.websocket.MessageHandler; +import jakarta.websocket.PongMessage; import jakarta.websocket.Session; import org.apache.wicket.protocol.http.WebApplication; import org.apache.wicket.protocol.ws.WebSocketSettings; import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; import org.apache.wicket.protocol.ws.api.IWebSocketSession; +import org.apache.wicket.protocol.ws.api.message.TextMessage; +import org.apache.wicket.protocol.ws.api.registry.IKey; /** * An {@link org.apache.wicket.protocol.ws.api.IWebSocketProcessor processor} that integrates with @@ -53,6 +56,7 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor session.addMessageHandler(new StringMessageHandler()); session.addMessageHandler(new BinaryMessageHandler()); + session.addMessageHandler(new PongMessageMessageHandler()); } @Override @@ -60,6 +64,18 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor { } + + private class PongMessageMessageHandler implements MessageHandler.Whole<PongMessage> + { + @Override + public void onMessage(PongMessage message) + { + IKey key = getRegistryKey(); + JavaxWebSocketProcessor.this.onPong(message.getApplicationData()); + } + } + + private class StringMessageHandler implements MessageHandler.Whole<String> { @Override
