Author: markt
Date: Thu Sep 19 10:56:51 2013
New Revision: 1524687

URL: http://svn.apache.org/r1524687
Log:
More robust solution to the problem of blocking writes not be closed when the 
web application stops. Futures used for blocking writes are registered with the 
session and the session completes them with an exception if they are 
outstanding when the session closes.

Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/FutureToSendHandler.java
    tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/FutureToSendHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/FutureToSendHandler.java?rev=1524687&r1=1524686&r2=1524687&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/FutureToSendHandler.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/FutureToSendHandler.java Thu 
Sep 19 10:56:51 2013
@@ -31,12 +31,19 @@ import javax.websocket.SendResult;
 class FutureToSendHandler implements Future<Void>, SendHandler {
 
     private final CountDownLatch latch = new CountDownLatch(1);
+    private final WsSession wsSession;
     private volatile SendResult result = null;
 
+    public FutureToSendHandler(WsSession wsSession) {
+        this.wsSession = wsSession;
+    }
+
+
     // --------------------------------------------------------- SendHandler
 
     @Override
     public void onResult(SendResult result) {
+
         this.result = result;
         latch.countDown();
     }
@@ -64,7 +71,12 @@ class FutureToSendHandler implements Fut
     @Override
     public Void get() throws InterruptedException,
             ExecutionException {
-        latch.await();
+        try {
+            wsSession.registerFuture(this);
+            latch.await();
+        } finally {
+            wsSession.unregisterFuture(this);
+        }
         if (result.getException() != null) {
             throw new ExecutionException(result.getException());
         }
@@ -75,7 +87,14 @@ class FutureToSendHandler implements Fut
     public Void get(long timeout, TimeUnit unit)
             throws InterruptedException, ExecutionException,
             TimeoutException {
-        boolean retval = latch.await(timeout, unit);
+        boolean retval = false;
+        try {
+            wsSession.registerFuture(this);
+            retval = latch.await(timeout, unit);
+        } finally {
+            wsSession.unregisterFuture(this);
+
+        }
         if (retval == false) {
             throw new TimeoutException();
         }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1524687&r1=1524686&r2=1524687&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu 
Sep 19 10:56:51 2013
@@ -71,12 +71,19 @@ wsSession.closed=The WebSocket session h
 wsSession.duplicateHandlerBinary=A binary message handler has already been 
configured
 wsSession.duplicateHandlerPong=A pong message handler has already been 
configured
 wsSession.duplicateHandlerText=A text message handler has already been 
configured
-wsSession.sendCloseFail=Failed to send close message to remote endpoint
 wsSession.invalidHandlerTypePong=A pong message handler must implement 
MessageHandler.Basic
+wsSession.messageFailed=Unable to write the complete message as the WebSocket 
connection has been closed
+wsSession.sendCloseFail=Failed to send close message to remote endpoint
 wsSession.removeHandlerFailed=Unable to remove the handler [{0}] as it was not 
registered with this session
 wsSession.unknownHandler=Unable to add the message handler [{0}] as it was for 
the unrecognised type [{1}]
 wsSession.unknownHandlerType=Unable to add the message handler [{0}] as it was 
wrapped as the unrecognised type [{1}]
 
+# Note the following message is used as a close reason in a WebSocket control
+# frame and therefore must be 123 bytes (not characters) or less in length.
+# Messages are encoded using UTF-8 where a single character may be encoded in
+# as many as 4 bytes.
+wsWebSocketContainer.shutdown=The web application is stopping
+
 wsWebSocketContainer.asynchronousChannelGroupFail=Unable to create dedicated 
AsynchronousChannelGroup for WebSocket clients which is required to prevent 
memory leaks in complex class loader environments like J2EE containers
 wsWebSocketContainer.asynchronousSocketChannelFail=Unable to open a connection 
to the server
 wsWebSocketContainer.defaultConfiguratorFaill=Failed to create the default 
configurator
@@ -90,4 +97,5 @@ wsWebSocketContainer.maxBuffer=This impl
 wsWebSocketContainer.missingAnnotation=Cannot use POJO class [{0}] as it is 
not annotated with @ClientEndpoint
 wsWebSocketContainer.pathNoHost=No host was specified in URI
 wsWebSocketContainer.pathWrongScheme=The scheme [{0}] is not supported
+wsWebSocketContainer.sessionCloseFail=Session with ID [{0}] did not close 
cleanly
 wsWebSocketContainer.sslEngineFail=Unable to create SSLEngine to support 
SSL/TLS connections

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1524687&r1=1524686&r2=1524687&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 
Thu Sep 19 10:56:51 2013
@@ -119,7 +119,7 @@ public abstract class WsRemoteEndpointIm
 
 
     public Future<Void> sendBytesByFuture(ByteBuffer data) {
-        FutureToSendHandler f2sh = new FutureToSendHandler();
+        FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
         sendBytesByCompletion(data, f2sh);
         return f2sh;
     }
@@ -156,7 +156,7 @@ public abstract class WsRemoteEndpointIm
 
 
     public Future<Void> sendStringByFuture(String text) {
-        FutureToSendHandler f2sh = new FutureToSendHandler();
+        FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
         sendStringByCompletion(text, f2sh);
         return f2sh;
     }
@@ -191,7 +191,7 @@ public abstract class WsRemoteEndpointIm
             // trigger a session close and depending on timing the client
             // session may close before we can read the timeout.
             long timeout = getBlockingSendTimeout();
-            FutureToSendHandler f2sh = new FutureToSendHandler();
+            FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
             TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh, 
part,
                     last, encoder, encoderBuffer, this);
             tmsh.write();
@@ -213,7 +213,7 @@ public abstract class WsRemoteEndpointIm
         // trigger a session close and depending on timing the client
         // session may close before we can read the timeout.
         long timeout = getBlockingSendTimeout();
-        FutureToSendHandler f2sh = new FutureToSendHandler();
+        FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
         startMessage(opCode, payload, last, f2sh);
         try {
             if (timeout == -1) {
@@ -448,7 +448,7 @@ public abstract class WsRemoteEndpointIm
     }
 
     public Future<Void> sendObjectByFuture(Object obj) {
-        FutureToSendHandler f2sh = new FutureToSendHandler();
+        FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
         sendObjectByCompletion(obj, f2sh);
         return f2sh;
     }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1524687&r1=1524686&r2=1524687&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Sep 19 
10:56:51 2013
@@ -38,6 +38,7 @@ import javax.websocket.Extension;
 import javax.websocket.MessageHandler;
 import javax.websocket.PongMessage;
 import javax.websocket.RemoteEndpoint;
+import javax.websocket.SendResult;
 import javax.websocket.Session;
 import javax.websocket.WebSocketContainer;
 
@@ -92,6 +93,7 @@ public class WsSession implements Sessio
             Constants.DEFAULT_BUFFER_SIZE;
     private volatile long maxIdleTimeout = 0;
     private volatile long lastActive = System.currentTimeMillis();
+    private Map<FutureToSendHandler,FutureToSendHandler> futures = new 
ConcurrentHashMap<>();
 
     /**
      * Creates a new WebSocket session for communication between the two
@@ -415,6 +417,12 @@ public class WsSession implements Sessio
 
             state = State.CLOSED;
         }
+
+        IOException ioe = new 
IOException(sm.getString("wsSession.messageFailed"));
+        SendResult sr = new SendResult(ioe);
+        for (FutureToSendHandler f2sh : futures.keySet()) {
+            f2sh.onResult(sr);
+        }
     }
 
 
@@ -510,6 +518,25 @@ public class WsSession implements Sessio
         }
     }
 
+
+    /**
+     * Make the session aware of a {@link FutureToSendHandler} that will need 
to
+     * be forcibly closed if the session closes before the
+     * {@link FutureToSendHandler} completes.
+     */
+    protected void registerFuture(FutureToSendHandler f2sh) {
+        futures.put(f2sh, f2sh);
+    }
+
+
+    /**
+     * Remove a {@link FutureToSendHandler} from the set of tracked instances.
+     */
+    protected void unregisterFuture(FutureToSendHandler f2sh) {
+        futures.remove(f2sh);
+    }
+
+
     @Override
     public URI getRequestURI() {
         checkState();

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java?rev=1524687&r1=1524686&r2=1524687&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java Thu 
Sep 19 10:56:51 2013
@@ -55,6 +55,8 @@ import javax.net.ssl.SSLException;
 import javax.net.ssl.TrustManagerFactory;
 import javax.websocket.ClientEndpoint;
 import javax.websocket.ClientEndpointConfig;
+import javax.websocket.CloseReason;
+import javax.websocket.CloseReason.CloseCodes;
 import javax.websocket.DeploymentException;
 import javax.websocket.Endpoint;
 import javax.websocket.Extension;
@@ -740,6 +742,27 @@ public class WsWebSocketContainer
         this.defaultAsyncTimeout = timeout;
     }
 
+
+    /**
+     * Cleans up the resources still in use by WebSocket sessions created from
+     * this container. This includes closing sessions and cancelling
+     * {@link Future}s associated with blocking read/writes.
+     */
+    public void destroy() {
+        CloseReason cr = new CloseReason(
+                CloseCodes.GOING_AWAY, 
sm.getString("wsWebSocketContainer.shutdown"));
+
+        for (WsSession session : sessions.keySet()) {
+            try {
+                session.close(cr);
+            } catch (IOException ioe) {
+                log.debug(sm.getString(
+                        "wsWebSocketContainer.sessionCloseFail", 
session.getId()), ioe);
+            }
+        }
+    }
+
+
     // ----------------------------------------------- BackgroundProcess 
methods
 
     @Override

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java?rev=1524687&r1=1524686&r2=1524687&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsContextListener.java 
Thu Sep 19 10:56:51 2013
@@ -21,10 +21,11 @@ import javax.servlet.ServletContextEvent
 import javax.servlet.ServletContextListener;
 
 /**
- * In normal usage, this {@link ServletContextListener} is not required as the
- * {@link WsSci} performs all the necessary bootstrap. If the {@link WsSci} is
- * disabled, this listener must be added manually to every
- * {@link javax.servlet.ServletContext} that uses WebSocket to bootstrap the
+ * In normal usage, this {@link ServletContextListener} does not need to be
+ * explicitly configured as the {@link WsSci} performs all the necessary
+ * bootstrap and installs this listener in the {@link ServletContext}. If the
+ * {@link WsSci} is disabled, this listener must be added manually to every
+ * {@link ServletContext} that uses WebSocket to bootstrap the
  * {@link WsServerContainer} correctly.
  */
 public class WsContextListener implements ServletContextListener {
@@ -35,12 +36,16 @@ public class WsContextListener implement
         // Don't trigger WebSocket initialization if a WebSocket Server
         // Container is already present
         if 
(sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE) == null) 
{
-            WsSci.init(sce.getServletContext());
+            WsSci.init(sce.getServletContext(), false);
         }
     }
 
     @Override
     public void contextDestroyed(ServletContextEvent sce) {
-        // NOOP
+        ServletContext sc = sce.getServletContext();
+        Object obj = 
sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
+        if (obj instanceof WsServerContainer) {
+            ((WsServerContainer) obj).destroy();
+        }
     }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java?rev=1524687&r1=1524686&r2=1524687&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsSci.java Thu Sep 19 
10:56:51 2013
@@ -44,7 +44,7 @@ public class WsSci implements ServletCon
     public void onStartup(Set<Class<?>> clazzes, ServletContext ctx)
             throws ServletException {
 
-        WsServerContainer sc = init(ctx);
+        WsServerContainer sc = init(ctx, true);
 
         if (clazzes == null || clazzes.size() == 0) {
             return;
@@ -125,7 +125,8 @@ public class WsSci implements ServletCon
     }
 
 
-    static WsServerContainer init(ServletContext servletContext) {
+    static WsServerContainer init(ServletContext servletContext,
+            boolean initBySciMechanism) {
 
         WsServerContainer sc = new WsServerContainer(servletContext);
 
@@ -133,6 +134,11 @@ public class WsSci implements ServletCon
                 Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);
 
         servletContext.addListener(new WsSessionListener(sc));
+        // Can't register the ContextListener again if the ContextListener is
+        // calling this method
+        if (initBySciMechanism) {
+            servletContext.addListener(new WsContextListener());
+        }
 
         return sc;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to