Author: markt
Date: Wed Oct 30 11:24:12 2013
New Revision: 1537046

URL: http://svn.apache.org/r1537046
Log:
Fix https://issues.apache.org/bugzilla/show_bug.cgi?id=55715
Add a per web application executor to the WebSocket implementation and use it 
for calling SendHandler.onResult() when there is a chance that the current 
thread also initiated the write

Modified:
    tomcat/tc7.0.x/trunk/   (props changed)
    tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java
    
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
    
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
    
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
    
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java
    
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
    tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
    tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml

Propchange: tomcat/tc7.0.x/trunk/
------------------------------------------------------------------------------
  Merged /tomcat/trunk:r1537041

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java?rev=1537046&r1=1537045&r2=1537046&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java 
(original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/Constants.java 
Wed Oct 30 11:24:12 2013
@@ -31,6 +31,14 @@ public class Constants {
     public static final String 
ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM =
             "org.apache.tomcat.websocket.noAddAfterHandshake";
 
+    // Executor configuration
+    public static final String EXECUTOR_CORE_SIZE_INIT_PARAM =
+            "org.apache.tomcat.websocket.executorCoreSize";
+    public static final String EXECUTOR_MAX_SIZE_INIT_PARAM =
+            "org.apache.tomcat.websocket.executorMaxSize";
+    public static final String EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM =
+            "org.apache.tomcat.websocket.executorKeepAliveTimeSeconds";
+
     public static final String SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE =
             "javax.websocket.server.ServerContainer";
 

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java?rev=1537046&r1=1537045&r2=1537046&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
 (original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
 Wed Oct 30 11:24:12 2013
@@ -45,6 +45,7 @@ public class WsContextListener implement
         ServletContext sc = sce.getServletContext();
         Object obj = 
sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
         if (obj instanceof WsServerContainer) {
+            ((WsServerContainer) obj).shutdownExecutor();
             ((WsServerContainer) obj).destroy();
         }
     }

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java?rev=1537046&r1=1537045&r2=1537046&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
 (original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
 Wed Oct 30 11:24:12 2013
@@ -233,7 +233,9 @@ public class WsHttpUpgradeHandler implem
 
         @Override
         public void onWritePossible() {
-            wsRemoteEndpointServer.onWritePossible();
+            // Triggered by the poller so this isn't the same thread that
+            // triggered the write so no need for a dispatch
+            wsRemoteEndpointServer.onWritePossible(false);
         }
 
 

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1537046&r1=1537045&r2=1537046&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
 (original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
 Wed Oct 30 11:24:12 2013
@@ -20,6 +20,9 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
 
 import javax.websocket.SendHandler;
 import javax.websocket.SendResult;
@@ -42,8 +45,12 @@ public class WsRemoteEndpointImplServer 
     private static final Log log =
             LogFactory.getLog(WsHttpUpgradeHandler.class);
 
+    private static final Queue<OnResultRunnable> onResultRunnables =
+            new ConcurrentLinkedQueue<OnResultRunnable>();
+
     private final AbstractServletOutputStream sos;
     private final WsWriteTimeout wsWriteTimeout;
+    private final ExecutorService executorService;
     private volatile SendHandler handler = null;
     private volatile ByteBuffer[] buffers = null;
 
@@ -55,6 +62,7 @@ public class WsRemoteEndpointImplServer 
             WsServerContainer serverContainer) {
         this.sos = sos;
         this.wsWriteTimeout = serverContainer.getTimeout();
+        this.executorService = serverContainer.getExecutorService();
     }
 
 
@@ -68,11 +76,13 @@ public class WsRemoteEndpointImplServer 
     protected void doWrite(SendHandler handler, ByteBuffer... buffers) {
         this.handler = handler;
         this.buffers = buffers;
-        onWritePossible();
+        // This is definitely the same thread that triggered the write so a
+        // dispatch will be required.
+        onWritePossible(true);
     }
 
 
-    public void onWritePossible() {
+    public void onWritePossible(boolean useDispatch) {
         boolean complete = true;
         try {
             // If this is false there will be a call back when it is true
@@ -89,7 +99,7 @@ public class WsRemoteEndpointImplServer 
                 }
                 if (complete) {
                     wsWriteTimeout.unregister(this);
-                    clearHandler(null);
+                    clearHandler(null, useDispatch);
                     if (close) {
                         close();
                     }
@@ -99,7 +109,7 @@ public class WsRemoteEndpointImplServer 
 
         } catch (IOException ioe) {
             wsWriteTimeout.unregister(this);
-            clearHandler(ioe);
+            clearHandler(ioe, useDispatch);
             close();
         }
         if (!complete) {
@@ -118,7 +128,11 @@ public class WsRemoteEndpointImplServer 
     @Override
     protected void doClose() {
         if (handler != null) {
-            clearHandler(new EOFException());
+            // close() can be triggered by a wide range of scenarios. It is far
+            // simpler just to always use a dispatch that it is to try and 
track
+            // whether or not this method was called by the same thread that
+            // triggered the write
+            clearHandler(new EOFException(), true);
         }
         try {
             sos.close();
@@ -136,15 +150,31 @@ public class WsRemoteEndpointImplServer 
     }
 
 
-    protected void onTimeout() {
+    /*
+     * Currently this is only called from the background thread so we could 
just
+     * call clearHandler() with useDispatch == false but the method parameter
+     * was added in case other callers started to use this method to make sure
+     * that those callers think through what the correct value of useDispatch 
is
+     * for them.
+     */
+    protected void onTimeout(boolean useDispatch) {
         if (handler != null) {
-            clearHandler(new SocketTimeoutException());
+            clearHandler(new SocketTimeoutException(), useDispatch);
         }
         close();
     }
 
 
-    private void clearHandler(Throwable t) {
+    /**
+     *
+     * @param t             The throwable associated with any error that
+     *                      occurred
+     * @param useDispatch   Should {@link SendHandler#onResult(SendResult)} be
+     *                      called from a new thread, keeping in mind the
+     *                      requirements of
+     *                      {@link javax.websocket.RemoteEndpoint.Async}
+     */
+    private void clearHandler(Throwable t, boolean useDispatch) {
         // Setting the result marks this (partial) message as
         // complete which means the next one may be sent which
         // could update the value of the handler. Therefore, keep a
@@ -153,11 +183,64 @@ public class WsRemoteEndpointImplServer 
         SendHandler sh = handler;
         handler = null;
         if (sh != null) {
+            if (useDispatch) {
+                OnResultRunnable r = onResultRunnables.poll();
+                if (r == null) {
+                    r = new OnResultRunnable(onResultRunnables);
+                }
+                r.init(sh, t);
+                if (executorService == null || executorService.isShutdown()) {
+                    // Can't use the executor so call the runnable directly.
+                    // This may not be strictly specification compliant in all
+                    // cases but during shutdown only close messages are going
+                    // to be sent so there should not be the issue of nested
+                    // calls leading to stack overflow as described in bug
+                    // 55715. The issues with nested calls was the reason for
+                    // the separate thread requirement in the specification.
+                    r.run();
+                } else {
+                    executorService.execute(r);
+                }
+            } else {
+                if (t == null) {
+                    sh.onResult(new SendResult());
+                } else {
+                    sh.onResult(new SendResult(t));
+                }
+            }
+        }
+    }
+
+
+    private static class OnResultRunnable implements Runnable {
+
+        private final Queue<OnResultRunnable> queue;
+
+        private volatile SendHandler sh;
+        private volatile Throwable t;
+
+        private OnResultRunnable(Queue<OnResultRunnable> queue) {
+            this.queue = queue;
+        }
+
+        private void init(SendHandler sh, Throwable t) {
+            this.sh = sh;
+            this.t = t;
+        }
+
+        @Override
+        public void run() {
             if (t == null) {
                 sh.onResult(new SendResult());
             } else {
                 sh.onResult(new SendResult(t));
             }
+            t = null;
+            sh = null;
+            // Return the Runnable to the queue when it has been finished with
+            // Note if this method takes an age to finish there shouldn't be 
any
+            // thread safety issues as the fields are cleared above.
+            queue.add(this);
         }
     }
 }

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java?rev=1537046&r1=1537045&r2=1537046&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java
 (original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsServerContainer.java
 Wed Oct 30 11:24:12 2013
@@ -26,6 +26,12 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.servlet.DispatcherType;
 import javax.servlet.FilterRegistration;
@@ -81,6 +87,7 @@ public class WsServerContainer extends W
     private volatile boolean addAllowed = true;
     private final ConcurrentHashMap<String,Set<WsSession>> 
authenticatedSessions =
             new ConcurrentHashMap<String, Set<WsSession>>();
+    private final ExecutorService executorService;
 
     WsServerContainer(ServletContext servletContext) {
 
@@ -104,6 +111,25 @@ public class WsServerContainer extends W
         if (value != null) {
             setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value));
         }
+        // Executor config
+        int executorCoreSize = 0;
+        int executorMaxSize = 10;
+        long executorKeepAliveTimeSeconds = 60;
+        value = servletContext.getInitParameter(
+                Constants.EXECUTOR_CORE_SIZE_INIT_PARAM);
+        if (value != null) {
+            executorCoreSize = Integer.parseInt(value);
+        }
+        value = servletContext.getInitParameter(
+                Constants.EXECUTOR_MAX_SIZE_INIT_PARAM);
+        if (value != null) {
+            executorMaxSize = Integer.parseInt(value);
+        }
+        value = servletContext.getInitParameter(
+                Constants.EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM);
+        if (value != null) {
+            executorKeepAliveTimeSeconds = Long.parseLong(value);
+        }
 
         FilterRegistration.Dynamic fr = servletContext.addFilter(
                 WsFilter.class.getName(), new WsFilter());
@@ -113,6 +139,22 @@ public class WsServerContainer extends W
                 DispatcherType.FORWARD);
 
         fr.addMappingForUrlPatterns(types, true, "/*");
+
+        // Use a per web application executor for any threads the the WebSocket
+        // server code needs to create. Group all of the threads under a single
+        // ThreadGroup.
+        StringBuffer threadGroupName = new StringBuffer("WebSocketServer-");
+        if ("".equals(servletContext.getContextPath())) {
+            threadGroupName.append("ROOT");
+        } else {
+            threadGroupName.append(servletContext.getContextPath());
+        }
+        ThreadGroup threadGroup = new ThreadGroup(threadGroupName.toString());
+        WsThreadFactory wsThreadFactory = new WsThreadFactory(threadGroup);
+
+        executorService = new ThreadPoolExecutor(executorCoreSize,
+                executorMaxSize, executorKeepAliveTimeSeconds, 
TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), wsThreadFactory);
     }
 
 
@@ -383,6 +425,21 @@ public class WsServerContainer extends W
         }
     }
 
+
+    ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+
+    void shutdownExecutor() {
+        executorService.shutdown();
+        try {
+            executorService.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            // Ignore the interruption and carry on
+        }
+    }
+
     private static void validateEncoders(Class<? extends Encoder>[] encoders)
             throws DeploymentException {
 
@@ -403,6 +460,7 @@ public class WsServerContainer extends W
         }
     }
 
+
     private static class TemplatePathMatch {
         private final ServerEndpointConfig config;
         private final UriTemplate uriTemplate;
@@ -449,4 +507,22 @@ public class WsServerContainer extends W
                     tpm2.getUriTemplate().getNormalizedPath());
         }
     }
+
+
+    private static class WsThreadFactory implements ThreadFactory {
+
+        private final ThreadGroup tg;
+        private final AtomicLong count = new AtomicLong(0);
+
+        private WsThreadFactory(ThreadGroup tg) {
+            this.tg = tg;
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(tg, r);
+            t.setName(tg.getName() + "-" + count.incrementAndGet());
+            return t;
+        }
+    }
 }

Modified: 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java?rev=1537046&r1=1537045&r2=1537046&view=diff
==============================================================================
--- 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
 (original)
+++ 
tomcat/tc7.0.x/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
 Wed Oct 30 11:24:12 2013
@@ -52,7 +52,9 @@ public class WsWriteTimeout implements B
             while (iter.hasNext()) {
                 WsRemoteEndpointImplServer endpoint = iter.next();
                 if (endpoint.getTimeoutExpiry() < now) {
-                    endpoint.onTimeout();
+                    // Background thread, not the thread that triggered the
+                    // write so no need to use a dispatch
+                    endpoint.onTimeout(false);
                 } else {
                     // Endpoints are ordered by timeout expiry so if this point
                     // is reached there is no need to check the remaining

Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1537046&r1=1537045&r2=1537046&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Oct 30 11:24:12 2013
@@ -100,6 +100,12 @@
         Define the web-fragment.xml in tomcat7-websocket.jar as a Servlet 3.0
         web fragment rather than as a Servlet 3.1 web fragment. (markt) 
       </fix>
+      <fix>
+        <bug>55715</bug>: Add a per web application executor to the WebSocket
+        implementation and use it for calling
+        <code>SendHandler.onResult()</code> when there is a chance that the
+        current thread also initiated the write. (markt)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Coyote">

Modified: tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml
URL: 
http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml?rev=1537046&r1=1537045&r2=1537046&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/web-socket-howto.xml Wed Oct 30 11:24:12 
2013
@@ -85,6 +85,25 @@
    property to <code>true</code> but any explicit setting on the servlet 
context
    will always take priority.</p>
 
+<p>The Java WebSocket 1.0 specification requires that call backs for
+   asynchronous writes are performed on a different thread to the thread that
+   initiated the write. Since the container thread pool is not exposed via the
+   Servlet API, the WebSocket implementation has to provide its own thread 
pool.
+   This thread pool is controlled by the following servlet context
+   initialization parameters:<a>
+   <ul>
+     <li><code>org.apache.tomcat.websocket.executorCoreSize</code>: The core
+         size of the executor thread pool. If not set, the default of 0 (zero)
+         is used.</li>
+     <li><code>org.apache.tomcat.websocket.executorMaxSize</code>: The maximum
+         permitted size of the executor thread pool. If not set, the default of
+         10 is used.</li>
+     <li><code>org.apache.tomcat.websocket.executorKeepAliveTimeSeconds</code>:
+         The maximum time an idle thread will remain in the executor thread 
pool
+         until it is terminated. If not specified, the default of 60 seconds is
+         used.</li>
+   </ul>
+
 <p>When using the WebSocket client to connect to server endpoints, the timeout
    for IO operations while establishing the connection is controlled by the
    <code>userProperties</code> of the provided



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to