Author: markt
Date: Thu Feb 14 14:50:48 2013
New Revision: 1446217

URL: http://svn.apache.org/r1446217
Log:
Implement session timeouts
Use a single background thread for session timeouts and server side async writes

Added:
    tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcess.java   
(with props)
    tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcessManager.java 
  (with props)
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
      - copied, changed from r1446080, 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java
Removed:
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java
Modified:
    tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
    
tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java
    
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java
    tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java

Added: tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcess.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcess.java?rev=1446217&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcess.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcess.java Thu 
Feb 14 14:50:48 2013
@@ -0,0 +1,26 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+public interface BackgroundProcess {
+
+    void backgroundProcess();
+
+    void setProcessPeriod(int period);
+
+    int getProcessPeriod();
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcess.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcessManager.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcessManager.java?rev=1446217&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcessManager.java 
(added)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcessManager.java 
Thu Feb 14 14:50:48 2013
@@ -0,0 +1,119 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.tomcat.util.ExceptionUtils;
+
+/**
+ * Provides a background processing mechanism that triggers roughly once a
+ * second. The class maintains a thread that only runs when there is at least
+ * one instance of {@link BackgroundProcess} registered.
+ */
+public class BackgroundProcessManager {
+
+    private static final BackgroundProcessManager instance;
+
+
+    static {
+        instance = new BackgroundProcessManager();
+    }
+
+
+    public static BackgroundProcessManager getInstance() {
+        return instance;
+    }
+
+    private final Set<BackgroundProcess> processes = new HashSet<>();
+    private final Object processesLock = new Object();
+    private WsBackgroundThread wsBackgroundThread = null;
+
+    private BackgroundProcessManager() {
+        // Hide default constructor
+    }
+
+
+    public void register(BackgroundProcess process) {
+        synchronized (processesLock) {
+            if (processes.size() == 0) {
+                wsBackgroundThread = new WsBackgroundThread(this);
+                wsBackgroundThread.setDaemon(true);
+                wsBackgroundThread.start();
+            }
+            processes.add(process);
+        }
+    }
+
+
+    public void unregister(BackgroundProcess process) {
+        synchronized (processesLock) {
+            processes.remove(process);
+            if (processes.size() == 0) {
+                wsBackgroundThread.halt();
+                wsBackgroundThread = null;
+            }
+        }
+    }
+
+
+    private void process() {
+        Set<BackgroundProcess> currentProcesses = new HashSet<>();
+        synchronized (processesLock) {
+            currentProcesses.addAll(processes);
+        }
+        for (BackgroundProcess process : currentProcesses) {
+            try {
+                process.backgroundProcess();
+            } catch (Throwable t) {
+                ExceptionUtils.handleThrowable(t);
+                // Ignore anything else
+                // TODO Log this
+            }
+        }
+    }
+
+
+    private static class WsBackgroundThread extends Thread {
+
+        private final BackgroundProcessManager manager;
+        private volatile boolean running = true;
+
+        public WsBackgroundThread(BackgroundProcessManager manager) {
+            setName("WebSocket background processing");
+            this.manager = manager;
+        }
+
+        @Override
+        public void run() {
+            while (running) {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+                manager.process();
+            }
+        }
+
+        public void halt() {
+            setName("WebSocket background processing - stopping");
+            running = false;
+        }
+    }
+}

Propchange: 
tomcat/trunk/java/org/apache/tomcat/websocket/BackgroundProcessManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Thu 
Feb 14 14:50:48 2013
@@ -36,6 +36,11 @@ wsRemoteEndpoint.changeType=When sending
 wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently 
even when using the asynchronous send messages. The client must wait for the 
previous message to complete before sending the next.
 wsRemoteEndpoint.inProgress=Message will not be sent because the WebSocket 
session is currently sending another message
 
+# Note the following message is used as a close reason in a WebSocket control
+# frame and therefore must be 123 bytes (not characters) or less in length.
+# Messages are encoded using UTF-8 where a single character may be encoded in
+# as many as 4 bytes.
+wsSession.timeout=The WebSocket session timeout expired
 wsSession.duplicateHandlerBinary=A binary message handler has already been 
configured
 wsSession.duplicateHandlerPong=A pong message handler has already been 
configured
 wsSession.duplicateHandlerText=A text message handler has already been 
configured

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java Thu Feb 14 
14:50:48 2013
@@ -90,6 +90,8 @@ public abstract class WsFrameBase {
 
     protected void processInputBuffer() throws IOException {
         while (true) {
+            wsSession.updateLastActive();
+
             if (state == State.NEW_FRAME) {
                 if (!processInitialHeader()) {
                     break;

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Thu 
Feb 14 14:50:48 2013
@@ -64,6 +64,7 @@ public abstract class WsRemoteEndpointBa
     private final ByteBuffer encoderBuffer = ByteBuffer.allocate(8192);
     private AtomicBoolean batchingAllowed = new AtomicBoolean(false);
     private volatile long asyncSendTimeout = -1;
+    private WsSession wsSession;
 
 
     @Override
@@ -226,6 +227,9 @@ public abstract class WsRemoteEndpointBa
 
     void startMessage(byte opCode, ByteBuffer payload, boolean last,
             SendHandler handler) {
+
+        wsSession.updateLastActive();
+
         MessagePart mp = new MessagePart(opCode, payload, last, handler, this);
 
         synchronized (messagePartLock) {
@@ -272,6 +276,8 @@ public abstract class WsRemoteEndpointBa
             }
         }
 
+        wsSession.updateLastActive();
+
         handler.setResult(result);
     }
 
@@ -459,7 +465,9 @@ public abstract class WsRemoteEndpointBa
     }
 
 
-
+    protected void setSession(WsSession wsSession) {
+        this.wsSession = wsSession;
+    }
 
 
     protected abstract void doWrite(SendHandler handler, ByteBuffer... data);

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Thu Feb 14 
14:50:48 2013
@@ -64,6 +64,7 @@ public class WsSession implements Sessio
     private volatile int maxTextMessageBufferSize =
             Constants.DEFAULT_BUFFER_SIZE;
     private volatile long sessionIdleTimeout = 0;
+    private volatile long lastActive = System.currentTimeMillis();
 
 
     /**
@@ -80,6 +81,7 @@ public class WsSession implements Sessio
             WsWebSocketContainer wsWebSocketContainer) {
         this.localEndpoint = localEndpoint;
         this.wsRemoteEndpoint = wsRemoteEndpoint;
+        this.wsRemoteEndpoint.setSession(this);
         this.webSocketContainer = wsWebSocketContainer;
         applicationClassLoader = 
Thread.currentThread().getContextClassLoader();
         wsRemoteEndpoint.setAsyncSendTimeout(
@@ -366,6 +368,26 @@ public class WsSession implements Sessio
     }
 
 
+    protected void updateLastActive() {
+        lastActive = System.currentTimeMillis();
+    }
+
+    protected void expire() {
+        long timeout = sessionIdleTimeout;
+        if (timeout < 1) {
+            return;
+        }
+
+        if (System.currentTimeMillis() - lastActive > timeout) {
+            try {
+                close(new CloseReason(CloseCodes.GOING_AWAY,
+                        sm.getString("wsSession.timeout")));
+            } catch (IOException e) {
+                // TODO Log this?
+            }
+        }
+    }
+
     // Protected so unit tests can use it
     protected static Class<?> getMessageType(MessageHandler listener) {
         return (Class<?>) getMessageType(listener.getClass());

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsWebSocketContainer.java Thu 
Feb 14 14:50:48 2013
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -47,7 +48,8 @@ import javax.xml.bind.DatatypeConverter;
 
 import org.apache.tomcat.util.res.StringManager;
 
-public class WsWebSocketContainer implements WebSocketContainer {
+public class WsWebSocketContainer
+        implements WebSocketContainer, BackgroundProcess {
 
     private static final StringManager sm =
             StringManager.getManager(Constants.PACKAGE_NAME);
@@ -57,12 +59,15 @@ public class WsWebSocketContainer implem
 
     private final Map<Class<?>, Set<WsSession>> endpointSessionMap =
             new HashMap<>();
+    private final Map<WsSession,WsSession> sessions = new 
ConcurrentHashMap<>();
     private final Object endPointSessionMapLock = new Object();
 
     private long defaultAsyncTimeout = -1;
     private int maxBinaryMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE;
     private int maxTextMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE;
     private volatile long maxSessionIdleTimeout = 0;
+    private int backgroundProcessCount = 0;
+    private int processPeriod = 10;
 
     @Override
     public Session connectToServer(Class<?> annotatedEndpointClass, URI path)
@@ -168,6 +173,9 @@ public class WsWebSocketContainer implem
 
     protected void registerSession(Class<?> endpoint, WsSession wsSession) {
         synchronized (endPointSessionMapLock) {
+            if (endpointSessionMap.size() == 0) {
+                BackgroundProcessManager.getInstance().register(this);
+            }
             Set<WsSession> wsSessions = endpointSessionMap.get(endpoint);
             if (wsSessions == null) {
                 wsSessions = new HashSet<>();
@@ -175,6 +183,7 @@ public class WsWebSocketContainer implem
             }
             wsSessions.add(wsSession);
         }
+        sessions.put(wsSession, wsSession);
     }
 
 
@@ -187,13 +196,20 @@ public class WsWebSocketContainer implem
                     endpointSessionMap.remove(endpoint);
                 }
             }
+            if (endpointSessionMap.size() == 0) {
+                BackgroundProcessManager.getInstance().unregister(this);
+            }
         }
+        sessions.remove(wsSession);
     }
 
 
     Set<Session> getOpenSession(Class<?> endpoint) {
         HashSet<Session> result = new HashSet<>();
-        result.addAll(endpointSessionMap.get(endpoint));
+        Set<WsSession> sessions = endpointSessionMap.get(endpoint);
+        if (sessions != null) {
+            result.addAll(sessions);
+        }
         return result;
     }
 
@@ -459,4 +475,40 @@ public class WsWebSocketContainer implem
             return headers;
         }
     }
+
+
+    // ----------------------------------------------- BackgroundProcess 
methods
+
+    @Override
+    public void backgroundProcess() {
+        // This method gets called once a second.
+        backgroundProcessCount ++;
+
+        if (backgroundProcessCount >= processPeriod) {
+            backgroundProcessCount = 0;
+
+            for (WsSession wsSession : sessions.keySet()) {
+                wsSession.expire();
+            }
+        }
+
+    }
+
+
+    @Override
+    public void setProcessPeriod(int period) {
+        this.processPeriod = period;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     *
+     * The default value is 10 which means session expirations are processed
+     * every 10 seconds.
+     */
+    @Override
+    public int getProcessPeriod() {
+        return processPeriod;
+    }
 }

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java 
(original)
+++ 
tomcat/trunk/java/org/apache/tomcat/websocket/server/ServerContainerImpl.java 
Thu Feb 14 14:50:48 2013
@@ -71,8 +71,7 @@ public class ServerContainerImpl extends
         return result;
     }
 
-    private final WsTimeout wsTimeout;
-    private final Thread timeoutThread;
+    private final WsWriteTimeout wsWriteTimeout = new WsWriteTimeout();
 
     private volatile ServletContext servletContext = null;
     private Map<String,ServerEndpointConfiguration> configMap =
@@ -82,14 +81,6 @@ public class ServerContainerImpl extends
             new ConcurrentHashMap<>();
 
 
-    private ServerContainerImpl() {
-        wsTimeout = new WsTimeout();
-        timeoutThread = new Thread(wsTimeout);
-        timeoutThread.setName(WsTimeout.THREAD_NAME_PREFIX + this);
-        timeoutThread.start();
-    }
-
-
     public void setServletContext(ServletContext servletContext) {
         if (this.servletContext == servletContext) {
             return;
@@ -109,10 +100,6 @@ public class ServerContainerImpl extends
         if (value != null) {
             setDefaultMaxTextMessageBufferSize(Integer.parseInt(value));
         }
-
-        // Update the timeout thread name
-        timeoutThread.setName(
-                WsTimeout.THREAD_NAME_PREFIX + 
servletContext.getContextPath());
     }
 
 
@@ -226,22 +213,8 @@ public class ServerContainerImpl extends
     }
 
 
-    protected WsTimeout getTimeout() {
-        return wsTimeout;
-    }
-
-
-    protected void stop() {
-        wsTimeout.stop();
-        int count = 0;
-        while (count < 50 && timeoutThread.isAlive()) {
-            count ++;
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                // Ignore
-            }
-        }
+    protected WsWriteTimeout getTimeout() {
+        return wsWriteTimeout;
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsListener.java Thu 
Feb 14 14:50:48 2013
@@ -35,7 +35,6 @@ public class WsListener implements Servl
 
     @Override
     public void contextDestroyed(ServletContextEvent sce) {
-        ServerContainerImpl sc = ServerContainerImpl.getServerContainer();
-        sc.stop();
+        // NOOP
     }
 }

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java
 (original)
+++ 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointServer.java
 Thu Feb 14 14:50:48 2013
@@ -42,7 +42,7 @@ public class WsRemoteEndpointServer exte
             LogFactory.getLog(WsProtocolHandler.class);
 
     private final ServletOutputStream sos;
-    private final WsTimeout wsTimeout;
+    private final WsWriteTimeout wsWriteTimeout;
     private volatile SendHandler handler = null;
     private volatile ByteBuffer[] buffers = null;
 
@@ -53,7 +53,7 @@ public class WsRemoteEndpointServer exte
     public WsRemoteEndpointServer(ServletOutputStream sos,
             ServerContainerImpl serverContainer) {
         this.sos = sos;
-        this.wsTimeout = serverContainer.getTimeout();
+        this.wsWriteTimeout = serverContainer.getTimeout();
     }
 
 
@@ -87,7 +87,7 @@ public class WsRemoteEndpointServer exte
                     }
                 }
                 if (complete) {
-                    wsTimeout.unregister(this);
+                    wsWriteTimeout.unregister(this);
                     if (close) {
                         close();
                     }
@@ -104,7 +104,7 @@ public class WsRemoteEndpointServer exte
             }
 
         } catch (IOException ioe) {
-            wsTimeout.unregister(this);
+            wsWriteTimeout.unregister(this);
             close();
             SendHandler sh = handler;
             handler = null;
@@ -117,7 +117,7 @@ public class WsRemoteEndpointServer exte
             if (timeout > 0) {
                 // Register with timeout thread
                 timeoutExpiry = timeout + System.currentTimeMillis();
-                wsTimeout.register(this);
+                wsWriteTimeout.register(this);
             }
         }
     }
@@ -132,7 +132,7 @@ public class WsRemoteEndpointServer exte
                 log.info(sm.getString("wsRemoteEndpointServer.closeFailed"), 
e);
             }
         }
-        wsTimeout.unregister(this);
+        wsWriteTimeout.unregister(this);
     }
 
 

Copied: 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java (from 
r1446080, tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java)
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java?p2=tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java&p1=tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java&r1=1446080&r2=1446217&rev=1446217&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsTimeout.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java 
Thu Feb 14 14:50:48 2013
@@ -20,41 +20,32 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.tomcat.websocket.BackgroundProcess;
+import org.apache.tomcat.websocket.BackgroundProcessManager;
 
 /**
  * Provides timeouts for asynchronous web socket writes. On the server side we
  * only have access to {@link javax.servlet.ServletOutputStream} and
  * {@link javax.servlet.ServletInputStream} so there is no way to set a timeout
- * for writes to the client. Hence the separate thread.
+ * for writes to the client.
  */
-public class WsTimeout implements Runnable {
-
-    public static final String THREAD_NAME_PREFIX = "Websocket Timeout - ";
+public class WsWriteTimeout implements BackgroundProcess {
 
     private final Set<WsRemoteEndpointServer> endpoints =
             new ConcurrentSkipListSet<>(new EndpointComparator());
-    private volatile boolean running = true;
-
-    public void stop() {
-        running = false;
-        synchronized (this) {
-            this.notify();
-        }
-    }
-
+    private final AtomicInteger count = new AtomicInteger(0);
+    private int backgroundProcessCount = 0;
+    private volatile int processPeriod = 1;
 
     @Override
-    public void run() {
-        while (running) {
-            // Wait for one second - no need for timeouts more frequently than
-            // that
-            synchronized (this) {
-                try {
-                    wait(1000);
-                } catch (InterruptedException e) {
-                    // Ignore
-                }
-            }
+    public void backgroundProcess() {
+        // This method gets called once a second.
+        backgroundProcessCount ++;
+
+        if (backgroundProcessCount >= processPeriod) {
+            backgroundProcessCount = 0;
 
             long now = System.currentTimeMillis();
             Iterator<WsRemoteEndpointServer> iter = endpoints.iterator();
@@ -73,13 +64,43 @@ public class WsTimeout implements Runnab
     }
 
 
+    @Override
+    public void setProcessPeriod(int period) {
+        this.processPeriod = period;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     *
+     * The default value is 1 which means asynchronous write timeouts are
+     * processed every 1 second.
+     */
+    @Override
+    public int getProcessPeriod() {
+        return processPeriod;
+    }
+
+
     public void register(WsRemoteEndpointServer endpoint) {
-        endpoints.add(endpoint);
+        boolean result = endpoints.add(endpoint);
+        if (result) {
+            int newCount = count.incrementAndGet();
+            if (newCount == 1) {
+                BackgroundProcessManager.getInstance().register(this);
+            }
+        }
     }
 
 
     public void unregister(WsRemoteEndpointServer endpoint) {
-        endpoints.remove(endpoint);
+        boolean result = endpoints.remove(endpoint);
+        if (result) {
+            int newCount = count.decrementAndGet();
+            if (newCount == 0) {
+                BackgroundProcessManager.getInstance().unregister(this);
+            }
+        }
     }
 
 

Modified: 
tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java?rev=1446217&r1=1446216&r2=1446217&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java 
(original)
+++ tomcat/trunk/test/org/apache/tomcat/websocket/TestWsWebSocketContainer.java 
Thu Feb 14 14:50:48 2013
@@ -583,6 +583,94 @@ public class TestWsWebSocketContainer ex
     }
 
 
+    @Test
+    public void testSessionExpiryContainer() throws Exception {
+
+        Tomcat tomcat = getTomcatInstance();
+        // Must have a real docBase - just use temp
+        Context ctx =
+            tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+        ctx.addApplicationListener(TesterEchoServer.Config.class.getName());
+
+        tomcat.start();
+
+        // Need access to implementation methods for configuring unit tests
+        WsWebSocketContainer wsContainer = (WsWebSocketContainer)
+                ContainerProvider.getWebSocketContainer();
+
+        // 5 second timeout
+        wsContainer.setMaxSessionIdleTimeout(5000);
+        wsContainer.setProcessPeriod(1);
+
+        connectToEchoServerBasic(wsContainer, EndpointA.class);
+        connectToEchoServerBasic(wsContainer, EndpointA.class);
+        Session s3a = connectToEchoServerBasic(wsContainer, EndpointA.class);
+
+        // Check all three sessions are open
+        Set<Session> setA = s3a.getOpenSessions();
+        Assert.assertEquals(3, setA.size());
+
+        int count = 0;
+        setA = s3a.getOpenSessions();
+        while (setA.size() > 0 && count < 8) {
+            count ++;
+            Thread.sleep(1000);
+            setA = s3a.getOpenSessions();
+        }
+
+        if (setA.size() > 0) {
+            Assert.fail("There were [" + setA.size() + "] open sessions");
+        }
+    }
+
+
+    @Test
+    public void testSessionExpirySession() throws Exception {
+
+        Tomcat tomcat = getTomcatInstance();
+        // Must have a real docBase - just use temp
+        Context ctx =
+            tomcat.addContext("", System.getProperty("java.io.tmpdir"));
+        ctx.addApplicationListener(TesterEchoServer.Config.class.getName());
+
+        tomcat.start();
+
+        // Need access to implementation methods for configuring unit tests
+        WsWebSocketContainer wsContainer = (WsWebSocketContainer)
+                ContainerProvider.getWebSocketContainer();
+
+        // 5 second timeout
+        wsContainer.setMaxSessionIdleTimeout(5000);
+        wsContainer.setProcessPeriod(1);
+
+        Session s1a = connectToEchoServerBasic(wsContainer, EndpointA.class);
+        s1a.setTimeout(3000);
+        Session s2a = connectToEchoServerBasic(wsContainer, EndpointA.class);
+        s2a.setTimeout(6000);
+        Session s3a = connectToEchoServerBasic(wsContainer, EndpointA.class);
+        s3a.setTimeout(9000);
+
+        // Check all three sessions are open
+        Set<Session> setA = s3a.getOpenSessions();
+
+        int expected = 3;
+        while (expected > 0) {
+            Assert.assertEquals(expected, setA.size());
+
+            int count = 0;
+            while (setA.size() == expected && count < 5) {
+                count ++;
+                Thread.sleep(1000);
+                setA = s3a.getOpenSessions();
+            }
+
+            expected--;
+        }
+
+        Assert.assertEquals(0, setA.size());
+    }
+
+
     private Session connectToEchoServerBasic(WebSocketContainer wsContainer,
             Class<? extends Endpoint> clazz) throws Exception {
         return wsContainer.connectToServer(clazz,



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to