This is an automated email from the ASF dual-hosted git repository. reiern70 pushed a commit to branch improvement/reiern70/WICKET-6954-ping-pong-keep-alive-9-x in repository https://gitbox.apache.org/repos/asf/wicket.git
commit 81e97270ad15923f922e8eaf971ef61e8afbeb18 Author: reiern70 <[email protected]> AuthorDate: Sat Feb 5 07:26:23 2022 -0500 [WICKET-6954] implement server side heart-beat + client reconnection in case of inactivity --- .../examples/websocket/JSR356Application.java | 11 +- .../websocket/WebSocketBehaviorDemoPage.java | 10 +- .../wicket/protocol/ws/WebSocketSettings.java | 165 ++++++++++++++++++++- .../ws/api/AbstractWebSocketProcessor.java | 30 +++- .../protocol/ws/api/BaseWebSocketBehavior.java | 28 ++++ .../protocol/ws/api/IWebSocketConnection.java | 50 +++++++ .../protocol/ws/api/IWebSocketProcessor.java | 10 ++ .../protocol/ws/api/message/ConnectedMessage.java | 17 +++ ...nnectedMessage.java => PongMessageMessage.java} | 29 +++- .../ws/api/res/js/wicket-websocket-jquery.js | 32 ++++ .../ws/api/res/js/wicket-websocket-setup.js.tmpl | 5 +- .../protocol/ws/timer/AbstractHeartBeatTimer.java | 92 ++++++++++++ .../ws/timer/HeartBeatWithReconnectTimer.java | 77 ++++++++++ .../protocol/ws/timer/PingPongHeartBeatTimer.java | 105 +++++++++++++ .../ws/util/tester/TestWebSocketConnection.java | 53 ++++++- .../ws/util/tester/TestWebSocketProcessor.java | 18 ++- .../protocol/ws/util/tester/WebSocketTester.java | 22 +++ .../util/tester/WebSocketTesterProcessorTest.java | 5 + .../ws/javax/JavaxWebSocketConnection.java | 80 +++++++++- .../protocol/ws/javax/JavaxWebSocketProcessor.java | 19 ++- 20 files changed, 834 insertions(+), 24 deletions(-) diff --git a/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/JSR356Application.java b/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/JSR356Application.java index 8bc6e9d..1a6462e 100644 --- a/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/JSR356Application.java +++ b/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/JSR356Application.java @@ -23,8 +23,8 @@ import org.apache.wicket.protocol.http.WebApplication; import org.apache.wicket.protocol.https.HttpsConfig; import org.apache.wicket.protocol.https.HttpsMapper; import org.apache.wicket.protocol.ws.WebSocketSettings; -import org.apache.wicket.protocol.ws.api.IWebSocketSession; -import org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer; +import org.apache.wicket.protocol.ws.timer.HeartBeatWithReconnectTimer; +import org.apache.wicket.protocol.ws.timer.PingPongHeartBeatTimer; import org.apache.wicket.request.Request; import org.apache.wicket.request.Response; import org.slf4j.Logger; @@ -43,6 +43,7 @@ public class JSR356Application extends WicketExampleApplication private static final Logger LOGGER = LoggerFactory.getLogger(JSR356Application.class); private ScheduledExecutorService scheduledExecutorService; + private HeartBeatWithReconnectTimer heartBeatWithReconnectTimer; @Override public Class<HomePage> getHomePage() @@ -83,6 +84,10 @@ public class JSR356Application extends WicketExampleApplication webSocketSettings.setSecurePort(8443); } + webSocketSettings.setUseHeartBeat(true); + webSocketSettings.setReconnectOnFailure(true); + heartBeatWithReconnectTimer = new HeartBeatWithReconnectTimer(webSocketSettings); + heartBeatWithReconnectTimer.start(this); // The websocket example loads JS from ajax.googleapis.com, which is not allowed by the CSP. // This now serves as an example on how to disable CSP getCspSettings().blocking().disabled(); @@ -97,7 +102,7 @@ public class JSR356Application extends WicketExampleApplication @Override protected void onDestroy() { scheduledExecutorService.shutdownNow(); - + heartBeatWithReconnectTimer.stop(); super.onDestroy(); } diff --git a/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/WebSocketBehaviorDemoPage.java b/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/WebSocketBehaviorDemoPage.java index c6bff98..76d0781 100644 --- a/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/WebSocketBehaviorDemoPage.java +++ b/wicket-examples/src/main/java/org/apache/wicket/examples/websocket/WebSocketBehaviorDemoPage.java @@ -92,12 +92,14 @@ public class WebSocketBehaviorDemoPage extends WicketExamplePage private static final long serialVersionUID = 1L; @Override - protected void onConnect(ConnectedMessage message) - { + protected void onConnect(ConnectedMessage message) { super.onConnect(message); - ScheduledExecutorService service = JSR356Application.get().getScheduledExecutorService(); - ChartUpdater.start(message, service); + if (!message.isReconnected()) + { + ScheduledExecutorService service = JSR356Application.get().getScheduledExecutorService(); + ChartUpdater.start(message, service); + } } }); add(downloadingContainer.setOutputMarkupPlaceholderTag(true).setVisible(false)); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java index df78a18..b0d579d 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java @@ -40,7 +40,7 @@ import org.apache.wicket.util.string.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.http.HttpServletRequest; +import java.time.Duration; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import javax.servlet.http.HttpServletRequest; + /** * Web Socket related settings. * @@ -88,6 +90,7 @@ public class WebSocketSettings private final AtomicInteger port = new AtomicInteger(); private final AtomicInteger securePort = new AtomicInteger(); + /** * Holds this WebSocketSettings in the Application's metadata. * This way wicket-core module doesn't have reference to wicket-native-websocket. @@ -140,6 +143,49 @@ public class WebSocketSettings private IWebSocketConnectionFilter connectionFilter; /** + * Boolean used to determine if ping-pong heart beat will be used. + */ + private boolean useHeartBeat = false; + + + /** + * Boolean used to determine if ping-pong heart beat will be used. + */ + private boolean usePingPongHeartBeat = false; + + /** + * Flag used to determine if client will try to reconnect in case of ping-pong failure + */ + private boolean reconnectOnFailure = false; + + /** + * In case ping or remote client immediately fails, this determines how many times ping + * will be retried before connection it terminated. + */ + private int maxPingRetries = 3; + + /** + * Periodicity by which the heartbeat timer will kick. + */ + private long heartBeatPace = Duration.ofSeconds(15).toMillis(); + + /** + * The max threshold assumed for network latency. + */ + private long networkLatencyThreshold = Duration.ofSeconds(2).toMillis(); + + /** + * The executor that handles delivering pings to remote peers and checking if peers have + * correctly ponged back (and terminates connections in case not). + */ + private Executor heartBeatsExecutor = new HeartBeatsExecutor(); + + /** + * Whether messages are broadcast when receiving pong messages + */ + private boolean sendMessagesOnPong = false; + + /** * A {@link org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer} that allows to configure * {@link org.apache.wicket.protocol.ws.api.IWebSocketSession}s. */ @@ -424,6 +470,96 @@ public class WebSocketSettings return securePort.get(); } + public void setUseHeartBeat(boolean useHeartBeat) + { + this.useHeartBeat = useHeartBeat; + } + + public boolean isUseHeartBeat() + { + return useHeartBeat; + } + + public void setReconnectOnFailure(boolean reconnectOnFailure) + { + this.reconnectOnFailure = reconnectOnFailure; + } + + public boolean isReconnectOnFailure() + { + return reconnectOnFailure; + } + + public long getHeartBeatPace() + { + return heartBeatPace; + } + + public void setHeartBeatPace(long heartBeatPace) + { + this.heartBeatPace = heartBeatPace; + } + + public void setHeartBeatPace(Duration heartBeatPace) + { + this.heartBeatPace = heartBeatPace.toMillis(); + } + + public long getNetworkLatencyThreshold() + { + return networkLatencyThreshold; + } + + public void setNetworkLatencyThreshold(long networkLatencyThreshold) + { + this.networkLatencyThreshold = networkLatencyThreshold; + } + + public void setNetworkLatencyThreshold(Duration networkLatencyThreshold) + { + this.networkLatencyThreshold = networkLatencyThreshold.toMillis(); + } + + public void setHeartBeatsExecutor(Executor heartBeatsExecutor) + { + this.heartBeatsExecutor = heartBeatsExecutor; + } + + public void setMaxPingRetries(int maxPingRetries) + { + this.maxPingRetries = maxPingRetries; + } + + public void setSendMessagesOnPong(boolean sendMessagesOnPong) + { + this.sendMessagesOnPong = sendMessagesOnPong; + } + + public boolean isSendMessagesOnPong() + { + return sendMessagesOnPong; + } + + public Executor getHeartBeatsExecutor() + { + return heartBeatsExecutor; + } + + public int getMaxPingRetries() + { + return maxPingRetries; + } + + public void setUsePingPongHeartBeat(boolean usePingPongHeartBeat) + { + this.usePingPongHeartBeat = usePingPongHeartBeat; + } + + public boolean isUsePingPongHeartBeat() + { + return usePingPongHeartBeat; + } + /** * Simple executor that runs the tasks in the caller thread. */ @@ -487,6 +623,33 @@ public class WebSocketSettings } } + public static class HeartBeatsExecutor implements Executor + { + + private final java.util.concurrent.Executor executor; + + + public HeartBeatsExecutor() + { + this(new ThreadPoolExecutor(1, 8, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new ThreadFactory())); + } + + public HeartBeatsExecutor(java.util.concurrent.Executor executor) + { + this.executor = executor; + + } + + @Override + public void run(final Runnable command) + { + executor.execute(command); + } + } + public static class ThreadFactory implements java.util.concurrent.ThreadFactory { private final AtomicInteger counter = new AtomicInteger(); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java index bf5a6a8..4a18ce4 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java @@ -16,6 +16,8 @@ */ package org.apache.wicket.protocol.ws.api; +import java.nio.ByteBuffer; + import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; @@ -45,6 +47,7 @@ import org.apache.wicket.protocol.ws.api.message.ConnectedMessage; import org.apache.wicket.protocol.ws.api.message.ErrorMessage; import org.apache.wicket.protocol.ws.api.message.IWebSocketMessage; import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; +import org.apache.wicket.protocol.ws.api.message.PongMessageMessage; import org.apache.wicket.protocol.ws.api.message.TextMessage; import org.apache.wicket.protocol.ws.api.registry.IKey; import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry; @@ -81,7 +84,6 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor * A pageId indicating that the endpoint is WebSocketResource */ static final int NO_PAGE_ID = -1; - static final String NO_PAGE_CLASS = "_NO_PAGE"; private final WebRequest webRequest; private final int pageId; @@ -149,6 +151,24 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor } @Override + public void onPong(ByteBuffer byteBuffer) + { + IKey key = getRegistryKey(); + WebApplication application = getApplication(); + String sessionId = getSessionId(); + IWebSocketConnection webSocketConnection = connectionRegistry.getConnection(application, sessionId, key); + if (webSocketConnection != null) + { + webSocketConnection.onPong(byteBuffer); + if (webSocketSettings.isSendMessagesOnPong()) + { + // if we want to deliver messages on pong deliver them + broadcastMessage(new PongMessageMessage(application, sessionId, key, byteBuffer)); + } + } + } + + @Override public void onMessage(final String message) { broadcastMessage(new TextMessage(getApplication(), getSessionId(), getRegistryKey(), message)); @@ -168,7 +188,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor * the web socket connection to use to communicate with the client * @see #onOpen(Object) */ - protected final void onConnect(final IWebSocketConnection connection) { + protected final void onConnect(final IWebSocketConnection connection, boolean reconnected) { IKey key = getRegistryKey(); connectionRegistry.setConnection(getApplication(), getSessionId(), key, connection); @@ -184,7 +204,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor } } - broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection); + broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key, reconnected), connection); } @Override @@ -202,7 +222,9 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor { if (webSocketSettings.shouldNotifyOnErrorEvent(t)) { IKey key = getRegistryKey(); - broadcastMessage(new ErrorMessage(getApplication(), getSessionId(), key, t)); + IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key); + ErrorMessage message = new ErrorMessage(application, sessionId, key, t); + broadcastMessage(message, connection); } } diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java index 6bf3b4a..a386056 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/BaseWebSocketBehavior.java @@ -161,6 +161,14 @@ public class BaseWebSocketBehavior extends Behavior Integer securePort = getSecurePort(webSocketSettings); variables.put("securePort", securePort); + variables.put("useHeartBeat", isUseHeartBeat(webSocketSettings)); + + variables.put("reconnectOnFailure", isReconnectOnFailure(webSocketSettings)); + + variables.put("heartBeatPace", getHeartBeatPace(webSocketSettings)); + + variables.put("networkLatencyThreshold", getNetworkLatencyThreshold(webSocketSettings)); + CharSequence contextPath = getContextPath(webSocketSettings); Args.notNull(contextPath, "contextPath"); variables.put("contextPath", contextPath); @@ -177,6 +185,26 @@ public class BaseWebSocketBehavior extends Behavior return variables; } + protected boolean isUseHeartBeat(WebSocketSettings webSocketSettings) + { + return webSocketSettings.isUseHeartBeat(); + } + + protected boolean isReconnectOnFailure(WebSocketSettings webSocketSettings) + { + return webSocketSettings.isReconnectOnFailure(); + } + + protected long getHeartBeatPace(WebSocketSettings webSocketSettings) + { + return webSocketSettings.getHeartBeatPace(); + } + + protected long getNetworkLatencyThreshold(WebSocketSettings webSocketSettings) + { + return webSocketSettings.getNetworkLatencyThreshold(); + } + protected Integer getPort(WebSocketSettings webSocketSettings) { return webSocketSettings.getPort(); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java index 82cf6e9..b09cb98 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java @@ -17,6 +17,7 @@ package org.apache.wicket.protocol.ws.api; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.wicket.Application; import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; @@ -30,6 +31,53 @@ import org.apache.wicket.protocol.ws.api.registry.IKey; public interface IWebSocketConnection { /** + * + * @return the last time connection was checked to be alive (checked via ping/pong mechanism) + */ + long getLastTimeAlive(); + + /** + * + * @return {@code true} if connection is alive (checked via ping/pong mechanism). {@code false} otherwise + */ + boolean isAlive(); + + /** + * Allows setting whether connection is alive or not. + * + * @param alive Alive + */ + void setAlive(boolean alive); + + /** + * Terminates the connection. + * + * @param reason The reason to terminate connection. + */ + void terminate(String reason); + + /** + * Sends a ping message to the server. + * + * @throws IOException if something went wrong with ping + */ + void ping() throws IOException; + + + /** + * Allows the developer to send an unsolicited Pong message containing the given application data in order to serve + * as a unidirectional heartbeat for the session. + */ + void pong() throws IOException; + + /** + * Called when remote peer answers to ping with pong message. + * + * @param byteBuffer Contains application specific content + */ + void onPong(ByteBuffer byteBuffer); + + /** * @return {@code true} when the underlying native web socket * connection is still open. */ @@ -69,6 +117,8 @@ public interface IWebSocketConnection */ IWebSocketConnection sendMessage(byte[] message, int offset, int length) throws IOException; + IWebSocketConnection sendMessage(byte[] message) throws IOException; + /** * Broadcasts a push message to the wicket page (and it's components) associated with this * connection. The components can then send messages or component updates to client by adding diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java index 74b34d1..708002f 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketProcessor.java @@ -16,8 +16,11 @@ */ package org.apache.wicket.protocol.ws.api; +import java.nio.ByteBuffer; + import org.apache.wicket.protocol.http.WebApplication; import org.apache.wicket.protocol.ws.WebSocketSettings; +import org.apache.wicket.protocol.ws.api.registry.IKey; /** * Processes web socket messages. @@ -42,6 +45,13 @@ public interface IWebSocketProcessor } /** + * Called when remote peer answers to ping with pong message. + * + * @param byteBuffer Contains application specific content + */ + void onPong(ByteBuffer byteBuffer); + + /** * Called when a text message arrives from the client * * @param message diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java index 8e0485f..b69b8ae 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java @@ -27,9 +27,26 @@ import org.apache.wicket.protocol.ws.api.registry.IKey; */ public class ConnectedMessage extends AbstractClientMessage { + private final boolean reconnected; + + public ConnectedMessage(Application application, String sessionId, IKey key, boolean reconnected) + { + super(application, sessionId, key); + this.reconnected = reconnected; + } + public ConnectedMessage(Application application, String sessionId, IKey key) { super(application, sessionId, key); + reconnected = false; + } + + /** + * @return {@code true} if connection happened because client initiated a reconnection + */ + public boolean isReconnected() + { + return reconnected; } @Override diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java similarity index 59% copy from wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java copy to wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java index 8e0485f..0708f82 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/ConnectedMessage.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/PongMessageMessage.java @@ -16,25 +16,40 @@ */ package org.apache.wicket.protocol.ws.api.message; +import java.nio.ByteBuffer; + import org.apache.wicket.Application; import org.apache.wicket.protocol.ws.api.registry.IKey; +import org.apache.wicket.util.lang.Args; /** - * A {@link IWebSocketMessage message} when a client creates web socket - * connection. + * A {@link IWebSocketMessage message} with Pong message data * * @since 6.0 */ -public class ConnectedMessage extends AbstractClientMessage +public class PongMessageMessage extends AbstractClientMessage { - public ConnectedMessage(Application application, String sessionId, IKey key) + private final ByteBuffer byteBuffer; + + /** + * + * @param application + * the Wicket application + * @param sessionId + * the id of the http session + * @param key + * the page id or resource name + * @param byteBuffer + * the message sent from the client + */ + public PongMessageMessage(Application application, String sessionId, IKey key, ByteBuffer byteBuffer) { super(application, sessionId, key); + this.byteBuffer = Args.notNull(byteBuffer, "byteBuffer"); } - @Override - public final String toString() + public ByteBuffer getByteBuffer() { - return "Client is connected"; + return byteBuffer; } } diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js index 93b7386..75cf05c 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-jquery.js @@ -85,17 +85,32 @@ url += '&context=' + encodeURIComponent(WWS.context); } + // this flag is used at server side to send reconnect event + if (WWS.reconnect) { + url += '&reconnect=' + true + } + url += '&wicket-ajax-baseurl=' + encodeURIComponent(WWS.baseUrl); url += '&wicket-app-name=' + encodeURIComponent(WWS.appName); + console.log(url); + self.ws = new WebSocket(url); self.ws.onopen = function (evt) { Wicket.Event.publish(topics.Opened, evt); + if (Wicket.WebSocket.useHeartBeat) { + self.heartbeat(); + } }; self.ws.onmessage = function (event) { + if (WWS.useHeartBeat) { + // reset heartbeat in any message + self.heartbeat(); + } + var message = event.data; if (typeof(message) === 'string' && message.indexOf('<ajax-response>') > -1) { Wicket.channelManager.schedule(Wicket.WebSocket.MESSAGE_CHANNEL, Wicket.bind(function () { @@ -120,6 +135,7 @@ self.ws.onclose = function (evt) { if (self.ws) { self.ws.close(); + clearTimeout(self.pingTimeout); self.ws = null; Wicket.Event.publish(topics.Closed, evt); } @@ -139,6 +155,22 @@ } }, + heartbeat: function () { + clearTimeout(this.pingTimeout); + // Set a timeout in order to check ping received + this.pingTimeout = setTimeout(() => { + this.ws.close(); + // try to reconnect to server + if (Wicket.WebSocket.reconnectOnFailure) + { + Wicket.Log.debug("Trying to reconnect to server"); + Wicket.WebSocket.INSTANCE = null; + Wicket.WebSocket.reconnect = true; + Wicket.WebSocket.createDefaultConnection(); + } + }, Wicket.WebSocket.heartBeatPace + Wicket.WebSocket.networkLatencyThreshold); + }, + send: function (text) { if (this.ws && text) { Wicket.Log.info('[WebSocket.send] Sending: ' + text); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl index 98e9bc8..ee55ba5 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/res/js/wicket-websocket-setup.js.tmpl @@ -4,7 +4,10 @@ if (typeof(Wicket.WebSocket.appName) === "undefined") { jQuery.extend(Wicket.WebSocket, { pageId: ${pageId}, context: '${context}', resourceName: '${resourceName}', connectionToken: '${connectionToken}', baseUrl: '${baseUrl}', contextPath: '${contextPath}', appName: '${applicationName}', - port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}' }); + port: ${port}, securePort: ${securePort}, filterPrefix: '${filterPrefix}', sessionId: '${sessionId}', + useHeartBeat: ${useHeartBeat}, reconnectOnFailure: ${reconnectOnFailure}, + heartBeatPace: ${heartBeatPace}, networkLatencyThreshold: ${networkLatencyThreshold}}); + Wicket.WebSocket.createDefaultConnection(); } })(); diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/AbstractHeartBeatTimer.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/AbstractHeartBeatTimer.java new file mode 100644 index 0000000..4829329 --- /dev/null +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/AbstractHeartBeatTimer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws.timer; + +import java.io.IOException; +import java.util.Date; +import java.util.Timer; +import java.util.TimerTask; + +import org.apache.wicket.Application; +import org.apache.wicket.protocol.ws.WebSocketSettings; +import org.apache.wicket.protocol.ws.api.IWebSocketConnection; +import org.apache.wicket.protocol.ws.concurrent.Executor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractHeartBeatTimer { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractHeartBeatTimer.class); + + protected final WebSocketSettings webSocketSettings; + + // internal heartbeat's timer. + private Timer heartBeatsTimer; + + public AbstractHeartBeatTimer(WebSocketSettings webSocketSettings) + { + this.webSocketSettings = webSocketSettings; + } + + public final void start(Application application) + { + if (isTimerEnabled() == false) + { + return; + } + + if (LOG.isInfoEnabled()) + { + LOG.info("Starting thread pushing heart beats"); + } + + TimerTask timerTask = new TimerTask() + { + @Override + public void run() + { + try + { + sendHeartBeats(application); + } + catch (Exception e) + { + LOG.error("Error while checking connections", e); + } + } + }; + + this.heartBeatsTimer = new Timer(true); + this.heartBeatsTimer.schedule(timerTask, new Date(), webSocketSettings.getHeartBeatPace()); + } + + protected abstract boolean isTimerEnabled(); + + public final void stop() + { + if (LOG.isInfoEnabled()) + { + LOG.info("Stopping thread pushing heart beats"); + } + if (this.heartBeatsTimer != null) + { + this.heartBeatsTimer.cancel(); + } + } + + protected abstract void sendHeartBeats(Application application); +} diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/HeartBeatWithReconnectTimer.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/HeartBeatWithReconnectTimer.java new file mode 100644 index 0000000..6b05fb5 --- /dev/null +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/HeartBeatWithReconnectTimer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws.timer; + +import java.io.IOException; + +import org.apache.wicket.Application; +import org.apache.wicket.protocol.ws.WebSocketSettings; +import org.apache.wicket.protocol.ws.api.IWebSocketConnection; +import org.apache.wicket.protocol.ws.concurrent.Executor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HeartBeatWithReconnectTimer extends AbstractHeartBeatTimer { + + private static final Logger LOG = LoggerFactory.getLogger(HeartBeatWithReconnectTimer.class); + + + public HeartBeatWithReconnectTimer(WebSocketSettings webSocketSettings) + { + super(webSocketSettings); + } + + + @Override + protected boolean isTimerEnabled() { + if (webSocketSettings.isUseHeartBeat() == false) + { + if (LOG.isInfoEnabled()) + { + LOG.info("useHeartBeat is set to false. Thus we won't start heartbeat's sending thread"); + } + return false; + } + return true; + } + + + protected void sendHeartBeats(Application application) + { + final Executor heartBeatsExecutor = webSocketSettings.getHeartBeatsExecutor(); + final int maxPingRetries = webSocketSettings.getMaxPingRetries(); + for (IWebSocketConnection connection: webSocketSettings.getConnectionRegistry().getConnections(application)) + { + heartBeatsExecutor.run(() -> ping(connection, maxPingRetries)); + } + } + + private void ping(IWebSocketConnection connection, final int pingRetryCounter) + { + try + { + // we just sent a binary message + connection.sendMessage(new byte[]{10}); + // if client does not receive message it might try to reconnect + // depending on settings + } + catch (IOException e) + { + ping(connection, pingRetryCounter - 1); + } + } +} diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/PingPongHeartBeatTimer.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/PingPongHeartBeatTimer.java new file mode 100644 index 0000000..ae24ac4 --- /dev/null +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/timer/PingPongHeartBeatTimer.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws.timer; + +import java.io.IOException; + +import org.apache.wicket.Application; +import org.apache.wicket.protocol.ws.WebSocketSettings; +import org.apache.wicket.protocol.ws.api.IWebSocketConnection; +import org.apache.wicket.protocol.ws.concurrent.Executor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PingPongHeartBeatTimer extends AbstractHeartBeatTimer { + + private static final Logger LOG = LoggerFactory.getLogger(PingPongHeartBeatTimer.class); + + public PingPongHeartBeatTimer(WebSocketSettings webSocketSettings) + { + super(webSocketSettings); + } + + @Override + protected boolean isTimerEnabled() { + if (webSocketSettings.isUsePingPongHeartBeat() == false) + { + if (LOG.isInfoEnabled()) + { + LOG.info("usePingPongHeartBeat is set to false. Thus we won't start heartbeat's sending thread"); + } + return false; + } + return true; + } + + + protected void sendHeartBeats(Application application) + { + final long heartBeatPace = webSocketSettings.getHeartBeatPace(); + final long networkLatencyThreshold = webSocketSettings.getNetworkLatencyThreshold(); + final Executor heartBeatsExecutor = webSocketSettings.getHeartBeatsExecutor(); + final int maxPingRetries = webSocketSettings.getMaxPingRetries(); + for (IWebSocketConnection connection: webSocketSettings.getConnectionRegistry().getConnections(application)) + { + // connection didn't received the PONG from peer terminate it + if (connection.isAlive() == false) + { + if (connection.getLastTimeAlive() - System.currentTimeMillis() > (heartBeatPace + networkLatencyThreshold)) + { + heartBeatsExecutor.run(() -> terminateConnection(connection)); + } + } + else + { + heartBeatsExecutor.run(() -> ping(connection, maxPingRetries)); + } + } + } + + private void ping(IWebSocketConnection connection, final int pingRetryCounter) + { + try + { + connection.ping(); + } + catch (IOException e) + { + if (pingRetryCounter == 0) + { + // ping failed enough times kill connection + terminateConnection(connection); + } + else + { + ping(connection, pingRetryCounter - 1); + } + } + } + + private void terminateConnection(IWebSocketConnection connection) + { + connection.setAlive(false); + if (LOG.isInfoEnabled()) + { + LOG.info("Terminating connection with ID {} because ping of remote peer failed {} times", + connection.getKey(), webSocketSettings.getMaxPingRetries()); + } + connection.terminate("Failed to ping remote peer"); + webSocketSettings.getConnectionRegistry().removeConnection(connection.getApplication(), connection.getSessionId(), connection.getKey()); + } +} diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java index 1923f49..f9c83cd 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketConnection.java @@ -17,6 +17,7 @@ package org.apache.wicket.protocol.ws.util.tester; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.wicket.Application; import org.apache.wicket.protocol.http.WebApplication; @@ -43,6 +44,41 @@ abstract class TestWebSocketConnection implements IWebSocketConnection } @Override + public long getLastTimeAlive() { + return 0; + } + + @Override + public boolean isAlive() { + return false; + } + + @Override + public void setAlive(boolean alive) { + + } + + @Override + public void ping() throws IOException { + + } + + @Override + public void pong() throws IOException { + + } + + @Override + public void onPong(ByteBuffer byteBuffer) { + + } + + @Override + public void terminate(String reason) { + close(-1, "abnormally closed"); + } + + @Override public boolean isOpen() { return isOpen; @@ -70,6 +106,13 @@ abstract class TestWebSocketConnection implements IWebSocketConnection return this; } + @Override + public IWebSocketConnection sendMessage(byte[] message) throws IOException { + checkOpenness(); + onOutMessage(message); + return this; + } + /** * A callback method that is called when a text message should be send to the client * @@ -79,7 +122,7 @@ abstract class TestWebSocketConnection implements IWebSocketConnection protected abstract void onOutMessage(String message); /** - * A callback method that is called when a text message should be send to the client + * A callback method that is called when a text message should be sent to the client * * @param message * the binary message to deliver to the client @@ -90,6 +133,14 @@ abstract class TestWebSocketConnection implements IWebSocketConnection */ protected abstract void onOutMessage(byte[] message, int offset, int length); + /** + * A callback method that is called when a text message should be sent to the client + * + * @param message + * the binary message to deliver to the client + */ + protected abstract void onOutMessage(byte[] message); + private void checkOpenness() { if (isOpen() == false) diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java index cc121e6..78994e2 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java @@ -16,6 +16,8 @@ */ package org.apache.wicket.protocol.ws.util.tester; +import java.io.IOException; + import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpSession; @@ -24,6 +26,7 @@ import org.apache.wicket.Page; import org.apache.wicket.protocol.http.WebApplication; import org.apache.wicket.protocol.http.mock.MockHttpServletRequest; import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; +import org.apache.wicket.protocol.ws.api.IWebSocketConnection; import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; import org.apache.wicket.request.http.WebRequest; import org.apache.wicket.util.lang.Args; @@ -154,11 +157,16 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor } @Override + protected void onOutMessage(byte[] message) { + TestWebSocketProcessor.this.onOutMessage(message); + } + + @Override public void sendMessage(IWebSocketPushMessage message) { TestWebSocketProcessor.this.broadcastMessage(message); } - }); + }, false); } /** @@ -180,4 +188,12 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor * the length of bytes to read from the binary message */ protected abstract void onOutMessage(byte[] message, int offset, int length); + + /** + * A callback method that is being called when a binary message is written to the TestWebSocketConnection + * + * @param message + * the binary message to deliver to the client + */ + protected abstract void onOutMessage(byte[] message); } diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java index df67eac..472a2c9 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java @@ -72,6 +72,12 @@ public class WebSocketTester { WebSocketTester.this.onOutMessage(message, offset, length); } + + @Override + protected void onOutMessage(byte[] message) + { + WebSocketTester.this.onOutMessage(message); + } }; socketProcessor.onOpen(null); } @@ -112,6 +118,12 @@ public class WebSocketTester { WebSocketTester.this.onOutMessage(message, offset, length); } + + @Override + protected void onOutMessage(byte[] message) + { + WebSocketTester.this.onOutMessage(message); + } }; socketProcessor.onOpen(null); } @@ -205,4 +217,14 @@ public class WebSocketTester protected void onOutMessage(byte[] message, int offset, int length) { } + + /** + * A callback method which may be overritten to receive messages pushed by the server + * + * @param message + * the pushed binary message from the server + */ + protected void onOutMessage(byte[] message) + { + } } diff --git a/wicket-native-websocket/wicket-native-websocket-core/src/test/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTesterProcessorTest.java b/wicket-native-websocket/wicket-native-websocket-core/src/test/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTesterProcessorTest.java index 0b807fb..fcdb180 100644 --- a/wicket-native-websocket/wicket-native-websocket-core/src/test/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTesterProcessorTest.java +++ b/wicket-native-websocket/wicket-native-websocket-core/src/test/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTesterProcessorTest.java @@ -59,6 +59,11 @@ public class WebSocketTesterProcessorTest { messageReceived.set(true); } + + @Override + protected void onOutMessage(byte[] message) { + messageReceived.set(true); + } } WicketTester tester; diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java index f7e4ca4..3fc9817 100644 --- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java +++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketConnection.java @@ -18,6 +18,8 @@ package org.apache.wicket.protocol.ws.javax; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import javax.websocket.CloseReason; import javax.websocket.Session; @@ -40,6 +42,9 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection private final Session session; + private final AtomicBoolean alive = new AtomicBoolean(false); + private final AtomicLong lastTimeAlive = new AtomicLong(System.currentTimeMillis()); + /** * Constructor. * @@ -50,6 +55,69 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection { super(webSocketProcessor); this.session = Args.notNull(session, "session"); + setAlive(true); + } + + @Override + public long getLastTimeAlive() + { + return lastTimeAlive.get(); + } + + @Override + public boolean isAlive() + { + return alive.get(); + } + + @Override + public void setAlive(boolean alive) + { + if (alive) + { + // is connection if alive we set the timestamp. + this.lastTimeAlive.set(System.currentTimeMillis()); + } + this.alive.set(alive); + } + + @Override + public synchronized void terminate(String reason) + { + close(CloseReason.CloseCodes.CLOSED_ABNORMALLY.getCode(), reason); + } + + @Override + public void ping() throws IOException + { + if (LOG.isDebugEnabled()) + { + LOG.debug("Pinging connection {}", getKey()); + } + ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA}); + session.getBasicRemote().sendPing(buf); + } + + @Override + public void pong() throws IOException + { + if (LOG.isDebugEnabled()) + { + LOG.debug("Sending unidirectional pon for connection {}", getKey()); + } + ByteBuffer buf = ByteBuffer.wrap(new byte[]{0xA}); + session.getBasicRemote().sendPong(buf); + } + + @Override + public void onPong(ByteBuffer byteBuffer) + { + if (LOG.isDebugEnabled()) + { + LOG.debug("Pong receive for {} with contents {}", getKey(), byteBuffer.array()); + } + // we received pong answer from remote peer. Thus, connection is alive + setAlive(true); } @Override @@ -66,7 +134,8 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection try { session.close(new CloseReason(new CloseCode(code), reason)); - } catch (IOException iox) + } + catch (IOException iox) { LOG.error("An error occurred while closing WebSocket session", iox); } @@ -93,6 +162,15 @@ public class JavaxWebSocketConnection extends AbstractWebSocketConnection return this; } + @Override + public IWebSocketConnection sendMessage(byte[] message) throws IOException { + checkClosed(); + + ByteBuffer buf = ByteBuffer.wrap(message); + session.getBasicRemote().sendBinary(buf); + return this; + } + private void checkClosed() { if (!isOpen()) diff --git a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java index 4d2e052..29fa130 100644 --- a/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java +++ b/wicket-native-websocket/wicket-native-websocket-javax/src/main/java/org/apache/wicket/protocol/ws/javax/JavaxWebSocketProcessor.java @@ -17,15 +17,19 @@ package org.apache.wicket.protocol.ws.javax; import java.nio.ByteBuffer; +import java.util.List; import javax.websocket.EndpointConfig; import javax.websocket.MessageHandler; +import javax.websocket.PongMessage; import javax.websocket.Session; import org.apache.wicket.protocol.http.WebApplication; import org.apache.wicket.protocol.ws.WebSocketSettings; import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; import org.apache.wicket.protocol.ws.api.IWebSocketSession; +import org.apache.wicket.protocol.ws.api.message.TextMessage; +import org.apache.wicket.protocol.ws.api.registry.IKey; /** * An {@link org.apache.wicket.protocol.ws.api.IWebSocketProcessor processor} that integrates with @@ -49,10 +53,12 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor { super(new JavaxUpgradeHttpRequest(session, endpointConfig), application); - onConnect(new JavaxWebSocketConnection(session, this)); + List<String> reconnect = session.getRequestParameterMap().get("reconnect"); + onConnect(new JavaxWebSocketConnection(session, this), reconnect != null && !reconnect.isEmpty()); session.addMessageHandler(new StringMessageHandler()); session.addMessageHandler(new BinaryMessageHandler()); + session.addMessageHandler(new PongMessageMessageHandler()); } @Override @@ -60,6 +66,17 @@ public class JavaxWebSocketProcessor extends AbstractWebSocketProcessor { } + + private class PongMessageMessageHandler implements MessageHandler.Whole<PongMessage> + { + @Override + public void onMessage(PongMessage message) + { + JavaxWebSocketProcessor.this.onPong(message.getApplicationData()); + } + } + + private class StringMessageHandler implements MessageHandler.Whole<String> { @Override
