Updated Branches: refs/heads/4832 [created] 22325a694
WICKET-4832 Websocket broadcast support Slightly improved from Pull Request https://github.com/apache/wicket/pull/24 Project: http://git-wip-us.apache.org/repos/asf/wicket/repo Commit: http://git-wip-us.apache.org/repos/asf/wicket/commit/22325a69 Tree: http://git-wip-us.apache.org/repos/asf/wicket/tree/22325a69 Diff: http://git-wip-us.apache.org/repos/asf/wicket/diff/22325a69 Branch: refs/heads/4832 Commit: 22325a694d861e65121b09976d575d63dbbb00ad Parents: aaf684d Author: Martin Tzvetanov Grigorov <[email protected]> Authored: Thu Dec 6 14:40:43 2012 +0100 Committer: Martin Tzvetanov Grigorov <[email protected]> Committed: Thu Dec 6 14:40:43 2012 +0100 ---------------------------------------------------------------------- .../wicket/protocol/ws/IWebSocketSettings.java | 90 +++++++++++ .../wicket/protocol/ws/WebSocketSettings.java | 86 ++++++++++ .../ws/api/AbstractWebSocketConnection.java | 45 +++++ .../ws/api/AbstractWebSocketProcessor.java | 23 ++- .../protocol/ws/api/IWebSocketConnection.java | 12 ++ .../ws/api/IWebSocketConnectionRegistry.java | 8 + .../ws/api/SimpleWebSocketConnectionRegistry.java | 25 +++ .../protocol/ws/api/WebSocketPushBroadcaster.java | 125 +++++++++++++++ .../wicket/protocol/ws/api/WebSocketResponse.java | 2 +- .../ws/api/event/WebSocketPushPayload.java | 44 +++++ .../ws/api/message/IWebSocketPushMessage.java | 26 +++ .../wicket/protocol/ws/concurrent/Executor.java | 42 +++++ .../ws/util/tester/TestWebSocketProcessor.java | 15 ++ .../protocol/ws/util/tester/WebSocketTester.java | 17 ++ .../ws/jetty/JettyWebSocketConnection.java | 7 +- .../protocol/ws/jetty/JettyWebSocketProcessor.java | 2 +- .../ws/jetty/Jetty9WebSocketConnection.java | 9 +- .../ws/jetty/Jetty9WebSocketProcessor.java | 2 +- .../ws/tomcat7/TomcatWebSocketConnection.java | 7 +- .../ws/tomcat7/TomcatWebSocketProcessor.java | 2 +- 20 files changed, 570 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/IWebSocketSettings.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/IWebSocketSettings.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/IWebSocketSettings.java new file mode 100644 index 0000000..9d64f06 --- /dev/null +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/IWebSocketSettings.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws; + + +import org.apache.wicket.Application; +import org.apache.wicket.MetaDataKey; +import org.apache.wicket.protocol.ws.api.IWebSocketConnectionRegistry; +import org.apache.wicket.protocol.ws.concurrent.Executor; + +/** + * Interface for websocket related settings. + * <p> + * More documentation is available about each setting in the setter method for the property. + * + * @since 6.4 + */ +public interface IWebSocketSettings +{ + public static final MetaDataKey<IWebSocketSettings> KEY = new MetaDataKey<IWebSocketSettings>() + { + }; + + /** + * Holds this IWebSocketSettings in the Application's metadata. + * This way wicket-core module doesn't have reference to wicket-native-websocket. + */ + public static final class Holder + { + public IWebSocketSettings get(Application application) + { + IWebSocketSettings settings = application.getMetaData(KEY); + if (settings == null) + { + synchronized (application) + { + if (settings == null) + { + settings = new WebSocketSettings(); + set(application, settings); + } + } + } + return settings; + } + + public void set(Application application, IWebSocketSettings settings) + { + application.setMetaData(KEY, settings); + } + } + + public static final Holder HOLDER = new Holder(); + + /** + * The executor for processing websocket push messages broadcasted to all sessions. + * + * @return + * The executor used for processing push messages. + */ + Executor getWebSocketPushMessageExecutor(); + + /** + * Set the executor for processing websocket push messages broadcasted to all sessions. + * Default executor does all the processing in the caller thread. Using a proper thread pool is adviced + * for applications that send push events from ajax calls to avoid page level deadlocks. + * + * @param executorService + * The executor used for processing push messages. + */ + IWebSocketSettings setWebSocketPushMessageExecutor(Executor executorService); + + IWebSocketConnectionRegistry getConnectionRegistry(); + + IWebSocketSettings setConnectionRegistry(IWebSocketConnectionRegistry connectionRegistry); +} http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java new file mode 100644 index 0000000..3c7180d --- /dev/null +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/WebSocketSettings.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws; + +import java.util.concurrent.Callable; + +import org.apache.wicket.protocol.ws.api.IWebSocketConnectionRegistry; +import org.apache.wicket.protocol.ws.api.SimpleWebSocketConnectionRegistry; +import org.apache.wicket.protocol.ws.concurrent.Executor; +import org.apache.wicket.util.lang.Args; + +/** + * + */ +public class WebSocketSettings implements IWebSocketSettings +{ + /** + * The executor that handles the processing of Web Socket push message broadcasts. + */ + private Executor webSocketPushMessageExecutor = new SameThreadExecutor(); + + /** + * Tracks all currently connected WebSocket clients + */ + private IWebSocketConnectionRegistry connectionRegistry = new SimpleWebSocketConnectionRegistry(); + + @Override + public IWebSocketSettings setWebSocketPushMessageExecutor(Executor executor) + { + Args.notNull(executor, "executor"); + this.webSocketPushMessageExecutor = executor; + return this; + } + + @Override + public IWebSocketConnectionRegistry getConnectionRegistry() + { + return connectionRegistry; + } + + @Override + public IWebSocketSettings setConnectionRegistry(IWebSocketConnectionRegistry connectionRegistry) + { + Args.notNull(connectionRegistry, "connectionRegistry"); + this.connectionRegistry = connectionRegistry; + return this; + } + + @Override + public Executor getWebSocketPushMessageExecutor() + { + return webSocketPushMessageExecutor; + } + + /** + * Simple executor that runs the tasks in the caller thread. + */ + static final class SameThreadExecutor implements Executor + { + @Override + public void run(Runnable command) + { + command.run(); + } + + @Override + public <T> T call(Callable<T> callable) throws Exception + { + return callable.call(); + } + } +} http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java new file mode 100644 index 0000000..02ae01c --- /dev/null +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketConnection.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws.api; + +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; +import org.apache.wicket.util.lang.Args; + +/** + * Abstract class handling the Web Socket broadcast messages. + */ +public abstract class AbstractWebSocketConnection implements IWebSocketConnection +{ + private final AbstractWebSocketProcessor webSocketProcessor; + + /** + * Constructor. + * + * @param webSocketProcessor + * the web socket processor to delegate to + */ + public AbstractWebSocketConnection(AbstractWebSocketProcessor webSocketProcessor) + { + this.webSocketProcessor = Args.notNull(webSocketProcessor, "webSocketProcessor"); + } + + @Override + public void sendMessage(IWebSocketPushMessage message) + { + webSocketProcessor.broadcastMessage(message); + } +} http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java index 07c4fd3..7b58d11 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/AbstractWebSocketProcessor.java @@ -25,15 +25,18 @@ import org.apache.wicket.ThreadContext; import org.apache.wicket.ajax.WebSocketRequestHandler; import org.apache.wicket.event.Broadcast; import org.apache.wicket.page.IPageManager; +import org.apache.wicket.protocol.ws.IWebSocketSettings; import org.apache.wicket.protocol.ws.api.event.WebSocketBinaryPayload; import org.apache.wicket.protocol.ws.api.event.WebSocketClosedPayload; import org.apache.wicket.protocol.ws.api.event.WebSocketConnectedPayload; import org.apache.wicket.protocol.ws.api.event.WebSocketPayload; +import org.apache.wicket.protocol.ws.api.event.WebSocketPushPayload; import org.apache.wicket.protocol.ws.api.event.WebSocketTextPayload; import org.apache.wicket.protocol.ws.api.message.BinaryMessage; import org.apache.wicket.protocol.ws.api.message.ClosedMessage; import org.apache.wicket.protocol.ws.api.message.ConnectedMessage; import org.apache.wicket.protocol.ws.api.message.IWebSocketMessage; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; import org.apache.wicket.protocol.ws.api.message.TextMessage; import org.apache.wicket.request.cycle.RequestCycle; import org.apache.wicket.request.cycle.RequestCycleContext; @@ -80,7 +83,8 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor this.webRequest = new WebSocketRequest(new ServletRequestCopy(request)); this.application = Args.notNull(application, "application"); - this.connectionRegistry = new SimpleWebSocketConnectionRegistry(); + IWebSocketSettings webSocketSettings = IWebSocketSettings.HOLDER.get(application); + this.connectionRegistry = webSocketSettings.getConnectionRegistry(); } @Override @@ -129,7 +133,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor * @param message * the message to broadcast */ - protected final void broadcastMessage(final IWebSocketMessage message) + public final void broadcastMessage(final IWebSocketMessage message) { IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, pageId); @@ -143,7 +147,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor try { RequestCycle requestCycle; - if (oldRequestCycle == null) + if (oldRequestCycle == null || message instanceof IWebSocketPushMessage) { RequestCycleContext context = new RequestCycleContext(webRequest, webResponse, application.getRootRequestMapper(), application.getExceptionMapperProvider().get()); @@ -159,7 +163,7 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor ThreadContext.setApplication(application); Session session; - if (oldSession == null) + if (oldSession == null || message instanceof IWebSocketPushMessage) { ISessionStore sessionStore = application.getSessionStore(); session = sessionStore.lookup(webRequest); @@ -174,15 +178,15 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor try { Page page = (Page) pageManager.getPage(pageId); - WebSocketRequestHandler target = new WebSocketRequestHandler(page, connection); + WebSocketRequestHandler requestHandler = new WebSocketRequestHandler(page, connection); - WebSocketPayload payload = createEventPayload(message, target); + WebSocketPayload payload = createEventPayload(message, requestHandler); page.send(application, Broadcast.BREADTH, payload); if (!(message instanceof ConnectedMessage || message instanceof ClosedMessage)) { - target.respond(requestCycle); + requestHandler.respond(requestCycle); } } finally @@ -243,11 +247,14 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor { payload = new WebSocketClosedPayload((ClosedMessage) message, handler); } + else if (message instanceof IWebSocketPushMessage) + { + payload = new WebSocketPushPayload((IWebSocketPushMessage) message, handler); + } else { throw new IllegalArgumentException("Unsupported message type: " + message.getClass().getName()); } return payload; } - } http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java index 37a91bf..7a492df 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnection.java @@ -17,6 +17,7 @@ package org.apache.wicket.protocol.ws.api; import java.io.IOException; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; /** * Common interface for native WebSocket connections @@ -64,4 +65,15 @@ public interface IWebSocketConnection * @throws IOException when an IO error occurs during the write to the client */ IWebSocketConnection sendMessage(byte[] message, int offset, int length) throws IOException; + + /** + * Broadcasts a push message to the wicket page (and it's components) associated with this + * connection. The components can then send messages or component updates to client by adding + * them to the target. + * + * @param message + * the push message to send + * @since 6.4 + */ + void sendMessage(IWebSocketPushMessage message); } http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnectionRegistry.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnectionRegistry.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnectionRegistry.java index 30b08df..f2de9ad 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnectionRegistry.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/IWebSocketConnectionRegistry.java @@ -16,6 +16,7 @@ */ package org.apache.wicket.protocol.ws.api; +import java.util.Collection; import org.apache.wicket.Application; /** @@ -37,6 +38,13 @@ public interface IWebSocketConnectionRegistry IWebSocketConnection getConnection(Application application, String sessionId, Integer pageId); /** + * @param application + * the web application to look in + * @return collection of web socket connection used by any client connected to specified application + */ + Collection<IWebSocketConnection> getConnections(Application application); + + /** * Adds a new connection into the registry at the specified coordinates (application+session+page) * * @param application http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/SimpleWebSocketConnectionRegistry.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/SimpleWebSocketConnectionRegistry.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/SimpleWebSocketConnectionRegistry.java index 63242a6..97c590f 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/SimpleWebSocketConnectionRegistry.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/SimpleWebSocketConnectionRegistry.java @@ -16,6 +16,8 @@ */ package org.apache.wicket.protocol.ws.api; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.ConcurrentMap; import org.apache.wicket.Application; @@ -58,6 +60,29 @@ public class SimpleWebSocketConnectionRegistry implements IWebSocketConnectionRe return connection; } + /** + * Returns a collection of currently active websockets. The connections might close at any time. + * + * @param application + * @return + */ + public Collection<IWebSocketConnection> getConnections(Application application) + { + Args.notNull(application, "application"); + + Collection<IWebSocketConnection> connections = new ArrayList<IWebSocketConnection>(); + ConcurrentMap<String, ConcurrentMap<Integer, IWebSocketConnection>> connectionsBySession = application.getMetaData(KEY); + if (connectionsBySession != null) + { + for (ConcurrentMap<Integer, IWebSocketConnection> connectionsByPage : connectionsBySession.values()) + { + + connections.addAll(connectionsByPage.values()); + } + } + return connections; + } + @Override public void setConnection(Application application, String sessionId, Integer pageId, IWebSocketConnection connection) { http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java new file mode 100644 index 0000000..1d3d9a9 --- /dev/null +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketPushBroadcaster.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws.api; + +import static java.util.Collections.singletonList; + +import java.util.Collection; + +import org.apache.wicket.Application; +import org.apache.wicket.protocol.ws.IWebSocketSettings; +import org.apache.wicket.protocol.ws.api.message.ConnectedMessage; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; +import org.apache.wicket.protocol.ws.concurrent.Executor; +import org.apache.wicket.util.lang.Args; + +/** + * Allows pushing events for processing to Pages that have active web sockets. + * + * @since 6.4 + * @author Mikko Tiihonen + */ +public class WebSocketPushBroadcaster +{ + private final IWebSocketConnectionRegistry registry; + + public WebSocketPushBroadcaster(IWebSocketConnectionRegistry registry) + { + Args.notNull(registry, "registry"); + + this.registry = registry; + } + + /** + * Processes the given message in the page and session identified by the given websocket connection. + * The message is sent as an event to the Page and components of the session allowing the components + * to be updated. + * + * This method can be invoked from any thread, even a non-wicket thread. By default all processing + * is done in the caller thread. Use + * {@link IWebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)} + * to move processing to background threads. + * + * If the given connection is no longer open then the broadcast is silently ignored. + * + * @param connection + * The Web Socket connection that identifies the page and session + * @param message + * The push message event + */ + public void broadcast(ConnectedMessage connection, IWebSocketPushMessage message) + { + Args.notNull(connection, "connection"); + Args.notNull(message, "message"); + + IWebSocketConnection wsConnection = registry.getConnection(connection.getApplication(), + connection.getSessionId(), connection.getPageId()); + if (wsConnection == null) + { + return; + } + process(connection.getApplication(), singletonList(wsConnection), message); + } + + /** + * Processes the given message in all pages that have active Web Socket connections. + * The message is sent as an event to the Page and components of the session allowing the components + * to be updated. + * + * This method can be invoked from any thread, even a non-wicket thread. By default all processing + * is done in the caller thread. Use + * {@link IWebSocketSettings#setWebSocketPushMessageExecutor(org.apache.wicket.protocol.ws.concurrent.Executor)} + * to move processing to background threads. + * + * If some connections are not in valid state they are silently ignored. + * + * @param application + * The wicket application + * @param message + * The push message event + */ + public void broadcastAll(Application application, IWebSocketPushMessage message) + { + Args.notNull(application, "application"); + Args.notNull(message, "message"); + + Collection<IWebSocketConnection> wsConnections = registry.getConnections(application); + if (wsConnections == null) + { + return; + } + process(application, wsConnections, message); + } + + private void process(final Application application, final Collection<IWebSocketConnection> wsConnections, + final IWebSocketPushMessage message) + { + IWebSocketSettings webSocketSettings = IWebSocketSettings.HOLDER.get(application); + Executor executor = webSocketSettings.getWebSocketPushMessageExecutor(); + for (final IWebSocketConnection wsConnection : wsConnections) + { + executor.run(new Runnable() + { + @Override + public void run() + { + wsConnection.sendMessage(message); + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java index 8659936..4d43f97 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/WebSocketResponse.java @@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory; * when Wicket thread locals are available. * * When the thread locals are not available then you can write directly to the {@link IWebSocketConnection} - * taken from {@link SimpleWebSocketConnectionRegistry}. In this case the response wont be cached. + * taken from {@link IWebSocketConnectionRegistry}. In this case the response wont be cached. * * @since 6.0 */ http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/event/WebSocketPushPayload.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/event/WebSocketPushPayload.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/event/WebSocketPushPayload.java new file mode 100644 index 0000000..4e05f68 --- /dev/null +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/event/WebSocketPushPayload.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws.api.event; + +import org.apache.wicket.ajax.WebSocketRequestHandler; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; +import org.apache.wicket.util.lang.Args; + +/** + * A event broadcasting payload for the case when external source triggers a web socket push. + * + * @since 6.4 + */ +public class WebSocketPushPayload extends WebSocketPayload<IWebSocketPushMessage> +{ + private final IWebSocketPushMessage data; + + public WebSocketPushPayload(IWebSocketPushMessage data, WebSocketRequestHandler handler) + { + super(handler); + this.data = Args.notNull(data, "data"); + } + + @Override + public final IWebSocketPushMessage getMessage() + { + return data; + } + +} http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/IWebSocketPushMessage.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/IWebSocketPushMessage.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/IWebSocketPushMessage.java new file mode 100644 index 0000000..1bd7537 --- /dev/null +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/api/message/IWebSocketPushMessage.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws.api.message; + +/** + * A marker interface for all Web Socket push messages. + * + * @since 6.4 + */ +public interface IWebSocketPushMessage extends IWebSocketMessage { + +} http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/concurrent/Executor.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/concurrent/Executor.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/concurrent/Executor.java new file mode 100644 index 0000000..8d6c4ff --- /dev/null +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/concurrent/Executor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wicket.protocol.ws.concurrent; + +import java.util.concurrent.Callable; + +/** + * An abstraction over all available executor services. + * The application may use {@link java.util.concurrent.Executor} or + * Akka/Scala 2.10 ExecutionContext, or anything that serves the same purpose. + */ +public interface Executor +{ + /** + * Runs a simple task that doesn't return a result + * + * @see java.lang.Thread#run() + */ + void run(Runnable command); + + /** + * Computes a result, or throws an exception if unable to do so. + * + * @return computed result + * @throws Exception if unable to compute a result + */ + <T> T call(Callable<T> callable) throws Exception; +} http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java index 071d006..b9e0b5c 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/TestWebSocketProcessor.java @@ -24,6 +24,7 @@ import org.apache.wicket.Page; import org.apache.wicket.protocol.http.mock.MockHttpServletRequest; import org.apache.wicket.protocol.http.mock.MockHttpSession; import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; import org.apache.wicket.util.lang.Args; /** @@ -83,10 +84,24 @@ abstract class TestWebSocketProcessor extends AbstractWebSocketProcessor { TestWebSocketProcessor.this.onOutMessage(message, offset, length); } + + @Override + public void sendMessage(IWebSocketPushMessage message) + { + TestWebSocketProcessor.this.onPushMessage(message); + } }); } /** + * A callback method that is being called when a test message is broadcasted by WebSocketPushBroadcaster + * + * @param message + * the message sent to the Page and all its children + */ + protected abstract void onPushMessage(IWebSocketPushMessage message); + + /** * A callback method that is being called when a test message is written to the TestWebSocketConnection * * @param message http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java index 3571427..e7be351 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-core/src/main/java/org/apache/wicket/protocol/ws/util/tester/WebSocketTester.java @@ -18,6 +18,7 @@ package org.apache.wicket.protocol.ws.util.tester; import org.apache.wicket.Page; import org.apache.wicket.protocol.ws.api.IWebSocketProcessor; +import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage; import org.apache.wicket.util.lang.Args; /** @@ -44,6 +45,12 @@ public class WebSocketTester socketProcessor = new TestWebSocketProcessor(page) { @Override + protected void onPushMessage(IWebSocketPushMessage message) + { + WebSocketTester.this.onPushMessage(message); + } + + @Override protected void onOutMessage(String message) { WebSocketTester.this.onOutMessage(message); @@ -112,4 +119,14 @@ public class WebSocketTester protected void onOutMessage(byte[] message, int offset, int length) { } + + /** + * A callback method which may be overritten to receive messages pushed by WebSocketPushBroadcaster + * + * @param message + * the pushed message to the page + */ + public void onPushMessage(IWebSocketPushMessage message) + { + } } http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketConnection.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketConnection.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketConnection.java index 31eaca9..9f6451e 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketConnection.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketConnection.java @@ -18,6 +18,8 @@ package org.apache.wicket.protocol.ws.jetty; import java.io.IOException; +import org.apache.wicket.protocol.ws.api.AbstractWebSocketConnection; +import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; import org.apache.wicket.protocol.ws.api.IWebSocketConnection; import org.apache.wicket.util.lang.Args; import org.eclipse.jetty.websocket.WebSocket; @@ -27,12 +29,13 @@ import org.eclipse.jetty.websocket.WebSocket; * * @since 6.0 */ -public class JettyWebSocketConnection implements IWebSocketConnection +public class JettyWebSocketConnection extends AbstractWebSocketConnection { private WebSocket.Connection connection; - public JettyWebSocketConnection(final WebSocket.Connection connection) + public JettyWebSocketConnection(final WebSocket.Connection connection, final AbstractWebSocketProcessor webSocketProcessor) { + super(webSocketProcessor); this.connection = Args.notNull(connection, "connection"); } http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketProcessor.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketProcessor.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketProcessor.java index 9280c26..3665611 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketProcessor.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty/src/main/java/org/apache/wicket/protocol/ws/jetty/JettyWebSocketProcessor.java @@ -78,6 +78,6 @@ public class JettyWebSocketProcessor extends AbstractWebSocketProcessor { throw new IllegalArgumentException(JettyWebSocketProcessor.class.getName() + " can work only with " + WebSocket.Connection.class.getName()); } - onConnect(new JettyWebSocketConnection((WebSocket.Connection) connection)); + onConnect(new JettyWebSocketConnection((WebSocket.Connection) connection, this)); } } http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketConnection.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketConnection.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketConnection.java index bfc5d01..01f5d98 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketConnection.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketConnection.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.ExecutionException; +import org.apache.wicket.protocol.ws.api.AbstractWebSocketConnection; +import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; import org.apache.wicket.protocol.ws.api.IWebSocketConnection; import org.apache.wicket.util.lang.Args; import org.eclipse.jetty.util.Callback; @@ -27,11 +29,11 @@ import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.websocket.core.api.WebSocketConnection; /** - * A wrapper around Jetty9's native WebSocket.Connection + * A wrapper around Jetty9's native WebSocketConnection. * * @since 6.2 */ -public class Jetty9WebSocketConnection implements IWebSocketConnection +public class Jetty9WebSocketConnection extends AbstractWebSocketConnection { private final WebSocketConnection connection; @@ -41,8 +43,9 @@ public class Jetty9WebSocketConnection implements IWebSocketConnection * @param connection * the jetty websocket connection */ - public Jetty9WebSocketConnection(WebSocketConnection connection) + public Jetty9WebSocketConnection(WebSocketConnection connection, AbstractWebSocketProcessor webSocketProcessor) { + super(webSocketProcessor); this.connection = Args.notNull(connection, "connection"); } http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketProcessor.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketProcessor.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketProcessor.java index 7a5b1d4..a2a3734 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketProcessor.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-jetty9/src/main/java/org/apache/wicket/protocol/ws/jetty/Jetty9WebSocketProcessor.java @@ -62,7 +62,7 @@ public class Jetty9WebSocketProcessor extends AbstractWebSocketProcessor @Override public void onWebSocketConnect(WebSocketConnection connection) { - onConnect(new Jetty9WebSocketConnection(connection)); + onConnect(new Jetty9WebSocketConnection(connection, this)); } @Override http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketConnection.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketConnection.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketConnection.java index 969ed03..01ddfb0 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketConnection.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketConnection.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.nio.CharBuffer; import org.apache.catalina.websocket.WsOutbound; +import org.apache.wicket.protocol.ws.api.AbstractWebSocketConnection; +import org.apache.wicket.protocol.ws.api.AbstractWebSocketProcessor; import org.apache.wicket.protocol.ws.api.IWebSocketConnection; import org.apache.wicket.util.lang.Args; import org.slf4j.Logger; @@ -31,7 +33,7 @@ import org.slf4j.LoggerFactory; * * @since 6.0 */ -public class TomcatWebSocketConnection implements IWebSocketConnection +public class TomcatWebSocketConnection extends AbstractWebSocketConnection { private static final Logger LOG = LoggerFactory.getLogger(TomcatWebSocketConnection.class); @@ -39,8 +41,9 @@ public class TomcatWebSocketConnection implements IWebSocketConnection private boolean closed = false; - public TomcatWebSocketConnection(final WsOutbound connection) + public TomcatWebSocketConnection(final WsOutbound connection, final AbstractWebSocketProcessor webSocketProcessor) { + super(webSocketProcessor); this.connection = Args.notNull(connection, "connection"); } http://git-wip-us.apache.org/repos/asf/wicket/blob/22325a69/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketProcessor.java ---------------------------------------------------------------------- diff --git a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketProcessor.java b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketProcessor.java index c2ce7a3..694b46d 100644 --- a/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketProcessor.java +++ b/wicket-experimental/wicket-native-websocket/wicket-native-websocket-tomcat/src/main/java/org/apache/wicket/protocol/ws/tomcat7/TomcatWebSocketProcessor.java @@ -77,7 +77,7 @@ public class TomcatWebSocketProcessor extends AbstractWebSocketProcessor throw new IllegalArgumentException(TomcatWebSocketProcessor.class.getName() + " can work only with " + WsOutbound.class.getName()); } - onConnect(new TomcatWebSocketConnection((WsOutbound) containerConnection)); + onConnect(new TomcatWebSocketConnection((WsOutbound) containerConnection, this)); } }
