Author: markt Date: Wed Oct 30 11:24:12 2013 New Revision: 1537046 URL: http://svn.apache.org/r1537046 Log: Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=55715 Add a per web application executor to the WebSocket implementation and use it for calling SendHandler.onResult() when there is a chance that the current thread also initiated the write
Modified: tomcat/tc7.0.x/trunk/ (props changed) tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml Propchange: tomcat/tc7.0.x/trunk/ ------------------------------------------------------------------------------ Merged /tomcat/trunk:r1537041 Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java?rev=1537046&r1=1537045&r2=1537046&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java Wed Oct 30 11:24:12 2013 @@ -31,6 +31,14 @@ public class Constants { public static final String ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM = "org.apache.tomcat.websocket.noAddAfterHandshake"; + // Executor configuration + public static final String EXECUTOR_CORE_SIZE_INIT_PARAM = + "org.apache.tomcat.websocket.executorCoreSize"; + public static final String EXECUTOR_MAX_SIZE_INIT_PARAM = + "org.apache.tomcat.websocket.executorMaxSize"; + public static final String EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM = + "org.apache.tomcat.websocket.executorKeepAliveTimeSeconds"; + public static final String SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE = "javax.websocket.server.ServerContainer"; Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java?rev=1537046&r1=1537045&r2=1537046&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java Wed Oct 30 11:24:12 2013 @@ -45,6 +45,7 @@ public class WsContextListener implement ServletContext sc = sce.getServletContext(); Object obj = sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE); if (obj instanceof WsServerContainer) { + ((WsServerContainer) obj).shutdownExecutor(); ((WsServerContainer) obj).destroy(); } } Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java?rev=1537046&r1=1537045&r2=1537046&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java Wed Oct 30 11:24:12 2013 @@ -233,7 +233,9 @@ public class WsHttpUpgradeHandler implem @Override public void onWritePossible() { - wsRemoteEndpointServer.onWritePossible(); + // Triggered by the poller so this isn't the same thread that + // triggered the write so no need for a dispatch + wsRemoteEndpointServer.onWritePossible(false); } Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1537046&r1=1537045&r2=1537046&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java Wed Oct 30 11:24:12 2013 @@ -20,6 +20,9 @@ import java.io.EOFException; import java.io.IOException; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; import javax.websocket.SendHandler; import javax.websocket.SendResult; @@ -42,8 +45,12 @@ public class WsRemoteEndpointImplServer private static final Log log = LogFactory.getLog(WsHttpUpgradeHandler.class); + private static final Queue<OnResultRunnable> onResultRunnables = + new ConcurrentLinkedQueue<OnResultRunnable>(); + private final AbstractServletOutputStream sos; private final WsWriteTimeout wsWriteTimeout; + private final ExecutorService executorService; private volatile SendHandler handler = null; private volatile ByteBuffer[] buffers = null; @@ -55,6 +62,7 @@ public class WsRemoteEndpointImplServer WsServerContainer serverContainer) { this.sos = sos; this.wsWriteTimeout = serverContainer.getTimeout(); + this.executorService = serverContainer.getExecutorService(); } @@ -68,11 +76,13 @@ public class WsRemoteEndpointImplServer protected void doWrite(SendHandler handler, ByteBuffer... buffers) { this.handler = handler; this.buffers = buffers; - onWritePossible(); + // This is definitely the same thread that triggered the write so a + // dispatch will be required. + onWritePossible(true); } - public void onWritePossible() { + public void onWritePossible(boolean useDispatch) { boolean complete = true; try { // If this is false there will be a call back when it is true @@ -89,7 +99,7 @@ public class WsRemoteEndpointImplServer } if (complete) { wsWriteTimeout.unregister(this); - clearHandler(null); + clearHandler(null, useDispatch); if (close) { close(); } @@ -99,7 +109,7 @@ public class WsRemoteEndpointImplServer } catch (IOException ioe) { wsWriteTimeout.unregister(this); - clearHandler(ioe); + clearHandler(ioe, useDispatch); close(); } if (!complete) { @@ -118,7 +128,11 @@ public class WsRemoteEndpointImplServer @Override protected void doClose() { if (handler != null) { - clearHandler(new EOFException()); + // close() can be triggered by a wide range of scenarios. It is far + // simpler just to always use a dispatch that it is to try and track + // whether or not this method was called by the same thread that + // triggered the write + clearHandler(new EOFException(), true); } try { sos.close(); @@ -136,15 +150,31 @@ public class WsRemoteEndpointImplServer } - protected void onTimeout() { + /* + * Currently this is only called from the background thread so we could just + * call clearHandler() with useDispatch == false but the method parameter + * was added in case other callers started to use this method to make sure + * that those callers think through what the correct value of useDispatch is + * for them. + */ + protected void onTimeout(boolean useDispatch) { if (handler != null) { - clearHandler(new SocketTimeoutException()); + clearHandler(new SocketTimeoutException(), useDispatch); } close(); } - private void clearHandler(Throwable t) { + /** + * + * @param t The throwable associated with any error that + * occurred + * @param useDispatch Should {@link SendHandler#onResult(SendResult)} be + * called from a new thread, keeping in mind the + * requirements of + * {@link javax.websocket.RemoteEndpoint.Async} + */ + private void clearHandler(Throwable t, boolean useDispatch) { // Setting the result marks this (partial) message as // complete which means the next one may be sent which // could update the value of the handler. Therefore, keep a @@ -153,11 +183,64 @@ public class WsRemoteEndpointImplServer SendHandler sh = handler; handler = null; if (sh != null) { + if (useDispatch) { + OnResultRunnable r = onResultRunnables.poll(); + if (r == null) { + r = new OnResultRunnable(onResultRunnables); + } + r.init(sh, t); + if (executorService == null || executorService.isShutdown()) { + // Can't use the executor so call the runnable directly. + // This may not be strictly specification compliant in all + // cases but during shutdown only close messages are going + // to be sent so there should not be the issue of nested + // calls leading to stack overflow as described in bug + // 55715. The issues with nested calls was the reason for + // the separate thread requirement in the specification. + r.run(); + } else { + executorService.execute(r); + } + } else { + if (t == null) { + sh.onResult(new SendResult()); + } else { + sh.onResult(new SendResult(t)); + } + } + } + } + + + private static class OnResultRunnable implements Runnable { + + private final Queue<OnResultRunnable> queue; + + private volatile SendHandler sh; + private volatile Throwable t; + + private OnResultRunnable(Queue<OnResultRunnable> queue) { + this.queue = queue; + } + + private void init(SendHandler sh, Throwable t) { + this.sh = sh; + this.t = t; + } + + @Override + public void run() { if (t == null) { sh.onResult(new SendResult()); } else { sh.onResult(new SendResult(t)); } + t = null; + sh = null; + // Return the Runnable to the queue when it has been finished with + // Note if this method takes an age to finish there shouldn't be any + // thread safety issues as the fields are cleared above. + queue.add(this); } } } Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java?rev=1537046&r1=1537045&r2=1537046&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java Wed Oct 30 11:24:12 2013 @@ -26,6 +26,12 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.servlet.DispatcherType; import javax.servlet.FilterRegistration; @@ -81,6 +87,7 @@ public class WsServerContainer extends W private volatile boolean addAllowed = true; private final ConcurrentHashMap<String,Set<WsSession>> authenticatedSessions = new ConcurrentHashMap<String, Set<WsSession>>(); + private final ExecutorService executorService; WsServerContainer(ServletContext servletContext) { @@ -104,6 +111,25 @@ public class WsServerContainer extends W if (value != null) { setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value)); } + // Executor config + int executorCoreSize = 0; + int executorMaxSize = 10; + long executorKeepAliveTimeSeconds = 60; + value = servletContext.getInitParameter( + Constants.EXECUTOR_CORE_SIZE_INIT_PARAM); + if (value != null) { + executorCoreSize = Integer.parseInt(value); + } + value = servletContext.getInitParameter( + Constants.EXECUTOR_MAX_SIZE_INIT_PARAM); + if (value != null) { + executorMaxSize = Integer.parseInt(value); + } + value = servletContext.getInitParameter( + Constants.EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM); + if (value != null) { + executorKeepAliveTimeSeconds = Long.parseLong(value); + } FilterRegistration.Dynamic fr = servletContext.addFilter( WsFilter.class.getName(), new WsFilter()); @@ -113,6 +139,22 @@ public class WsServerContainer extends W DispatcherType.FORWARD); fr.addMappingForUrlPatterns(types, true, "/*"); + + // Use a per web application executor for any threads the the WebSocket + // server code needs to create. Group all of the threads under a single + // ThreadGroup. + StringBuffer threadGroupName = new StringBuffer("WebSocketServer-"); + if ("".equals(servletContext.getContextPath())) { + threadGroupName.append("ROOT"); + } else { + threadGroupName.append(servletContext.getContextPath()); + } + ThreadGroup threadGroup = new ThreadGroup(threadGroupName.toString()); + WsThreadFactory wsThreadFactory = new WsThreadFactory(threadGroup); + + executorService = new ThreadPoolExecutor(executorCoreSize, + executorMaxSize, executorKeepAliveTimeSeconds, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), wsThreadFactory); } @@ -383,6 +425,21 @@ public class WsServerContainer extends W } } + + ExecutorService getExecutorService() { + return executorService; + } + + + void shutdownExecutor() { + executorService.shutdown(); + try { + executorService.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore the interruption and carry on + } + } + private static void validateEncoders(Class<? extends Encoder>[] encoders) throws DeploymentException { @@ -403,6 +460,7 @@ public class WsServerContainer extends W } } + private static class TemplatePathMatch { private final ServerEndpointConfig config; private final UriTemplate uriTemplate; @@ -449,4 +507,22 @@ public class WsServerContainer extends W tpm2.getUriTemplate().getNormalizedPath()); } } + + + private static class WsThreadFactory implements ThreadFactory { + + private final ThreadGroup tg; + private final AtomicLong count = new AtomicLong(0); + + private WsThreadFactory(ThreadGroup tg) { + this.tg = tg; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(tg, r); + t.setName(tg.getName() + "-" + count.incrementAndGet()); + return t; + } + } } Modified: tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java?rev=1537046&r1=1537045&r2=1537046&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java Wed Oct 30 11:24:12 2013 @@ -52,7 +52,9 @@ public class WsWriteTimeout implements B while (iter.hasNext()) { WsRemoteEndpointImplServer endpoint = iter.next(); if (endpoint.getTimeoutExpiry() < now) { - endpoint.onTimeout(); + // Background thread, not the thread that triggered the + // write so no need to use a dispatch + endpoint.onTimeout(false); } else { // Endpoints are ordered by timeout expiry so if this point // is reached there is no need to check the remaining Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1537046&r1=1537045&r2=1537046&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Oct 30 11:24:12 2013 @@ -100,6 +100,12 @@ Define the web-fragment.xml in tomcat7-websocket.jar as a Servlet 3.0 web fragment rather than as a Servlet 3.1 web fragment. (markt) </fix> + <fix> + <bug>55715</bug>: Add a per web application executor to the WebSocket + implementation and use it for calling + <code>SendHandler.onResult()</code> when there is a chance that the + current thread also initiated the write. (markt) + </fix> </changelog> </subsection> <subsection name="Coyote"> Modified: tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml?rev=1537046&r1=1537045&r2=1537046&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml (original) +++ tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml Wed Oct 30 11:24:12 2013 @@ -85,6 +85,25 @@ property to <code>true</code> but any explicit setting on the servlet context will always take priority.</p> +<p>The Java WebSocket 1.0 specification requires that call backs for + asynchronous writes are performed on a different thread to the thread that + initiated the write. Since the container thread pool is not exposed via the + Servlet API, the WebSocket implementation has to provide its own thread pool. + This thread pool is controlled by the following servlet context + initialization parameters:<a> + <ul> + <li><code>org.apache.tomcat.websocket.executorCoreSize</code>: The core + size of the executor thread pool. If not set, the default of 0 (zero) + is used.</li> + <li><code>org.apache.tomcat.websocket.executorMaxSize</code>: The maximum + permitted size of the executor thread pool. If not set, the default of + 10 is used.</li> + <li><code>org.apache.tomcat.websocket.executorKeepAliveTimeSeconds</code>: + The maximum time an idle thread will remain in the executor thread pool + until it is terminated. If not specified, the default of 60 seconds is + used.</li> + </ul> + <p>When using the WebSocket client to connect to server endpoints, the timeout for IO operations while establishing the connection is controlled by the <code>userProperties</code> of the provided --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org