Author: dejanb
Date: Wed Oct 13 11:41:16 2010
New Revision: 1022071

URL: http://svn.apache.org/viewvc?rev=1022071&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2948 - ajax support for multiple 
clients in the same session

Added:
    
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
    
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java
Modified:
    activemq/trunk/activemq-web-demo/src/main/webapp/chat.html
    activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js
    activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html
    
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java

Modified: activemq/trunk/activemq-web-demo/src/main/webapp/chat.html
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/chat.html?rev=1022071&r1=1022070&r2=1022071&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/chat.html (original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/chat.html Wed Oct 13 
11:41:16 2010
@@ -55,7 +55,7 @@
                //     org.activemq.Chat.init();
                // }
                window.onload = function() {
-                       org.activemq.Amq.init({ uri: 'amq', logging: true, 
timeout: 45 });
+                       org.activemq.Amq.init({ uri: 'amq', logging: true, 
timeout: 45, clientId:(new Date()).getTime().toString() });
                        org.activemq.Chat.init();
                };
        </script>    

Modified: activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js?rev=1022071&r1=1022070&r2=1022071&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js (original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/js/amq.js Wed Oct 13 
11:41:16 2010
@@ -70,6 +70,11 @@ org.activemq.Amq = function() {
        // message, messageType }.
        var messageQueue = [];
 
+  // String to distinguish this client from others sharing the same session.
+  // This can occur when multiple browser windows or tabs using amq.js 
simultaneously.
+  // All windows share the same JESSIONID, but need to consume messages 
independently.
+  var clientId = null;
+  
        /**
         * Iterate over the returned XML and for each message in the response, 
         * invoke the handler with the matching id.
@@ -138,9 +143,8 @@ org.activemq.Amq = function() {
                var data = 'timeout=' + timeout * 1000
                                 + '&d=' + now.getTime()
                                 + '&r=' + Math.random();
-                                
                var options = { method: 'get',
-                       data: data,
+                       data: addClientId( data ),
                        success: pollHandler,
                        error: pollErrorHandler};
                adapter.ajax(uri, options);
@@ -158,7 +162,7 @@ org.activemq.Amq = function() {
                } else {
                        org.activemq.Amq.startBatch();
                        adapter.ajax(uri, { method: 'post',
-                               data: buildParams( [message] ),
+                               data: addClientId( buildParams( [message] ) ),
                                error: errorHandler,
                                headers: headers,
                                success: org.activemq.Amq.endBatch});
@@ -181,18 +185,33 @@ org.activemq.Amq = function() {
                }
                return s.join('');
        }
+       
+       // add clientId to data if it exists, before passing data to ajax 
connection adapter.
+       var addClientId = function( data ) {
+               var output = data || '';
+               if( clientId ) {
+                       if( output.length > 0 ) {
+                               output += '&';
+                       }
+                       output += 'clientId='+clientId;
+               }
+               return output;
+       }
 
        return {
+               // optional clientId can be supplied to allow multiple clients 
(browser windows) within the same session.
                init : function(options) {
                        connectStatusHandler = options.connectStatusHandler || 
function(connected){};
                        uri = options.uri || '/amq';
                        pollDelay = typeof options.pollDelay == 'number' ? 
options.pollDelay : 0;
                        timeout = typeof options.timeout == 'number' ? 
options.timeout : 25;
                        logging = options.logging;
+                       clientId = options.clientId;
                        adapter.init(options);
                        sendPoll();
+                       
                },
-
+                   
                startBatch : function() {
                        batchInProgress = true;
                },
@@ -205,7 +224,7 @@ org.activemq.Amq = function() {
                                
                                // we need to ensure that messages which set 
headers are sent by themselves.
                                // if 2 'listen' messages were sent together, 
and a 'selector' header were added to one of them,
-                               //       AMQ would add the selector to both 
'listen' commands.
+                               // AMQ would add the selector to both 'listen' 
commands.
                                for(i=0;i<messageQueue.length;i++) {
                                        // a message with headers should always 
be sent by itself.      if other messages have been added, send this one later.
                                        if ( messageQueue[ i ].headers && 
messagesToSend.length == 0 ) {
@@ -223,7 +242,7 @@ org.activemq.Amq = function() {
                                adapter.ajax(uri, {
                                        method: 'post',
                                        headers: outgoingHeaders,
-                                       data: body,
+                                       data: addClientId( body ),
                                        success: org.activemq.Amq.endBatch, 
                                        error: errorHandler});
                        } else {

Modified: activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html?rev=1022071&r1=1022070&r2=1022071&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html 
(original)
+++ activemq/trunk/activemq-web-demo/src/main/webapp/test/amq_test.html Wed Oct 
13 11:41:16 2010
@@ -282,6 +282,26 @@
       org.activemq.Amq.testPollHandler( response );
       
       assertEqual( 'test message', callbackValue.textContent );
+    }},
+    
+    testClientIdSpecifiedInInitIsAddedToAllAjaxRequests: function() { with( 
this ) {
+      // need to reset to remove the poll message sent when init() is called 
in setup().
+      org.activemq.AmqAdapter.reset();
+      org.activemq.Amq.init({ uri: '../amq', timeout: 30, 
clientId:'uniqueClientName' });
+      
+      org.activemq.Amq.addListener( 'id', 'queue://test', function(){} );
+      org.activemq.Amq.sendMessage( 'queue://test', '<message>test</message>' 
);
+      org.activemq.Amq.removeListener( 'id', 'topic://test' );      
+      org.activemq.Amq.endBatch();
+      
+      var requests = org.activemq.AmqAdapter.getRequests();
+      var clientNameRegex = /clientId=uniqueClientName/;
+      
+      assertEqual( 3, requests.length );
+      assertMatch( clientNameRegex, requests[0].options.data );
+      assertMatch( clientNameRegex, requests[1].options.data );
+      assertMatch( clientNameRegex, requests[2].options.data );
+      
     }}
     
   }); 

Added: 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1022071&view=auto
==============================================================================
--- 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
 (added)
+++ 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
 Wed Oct 13 11:41:16 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.web;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+import org.eclipse.jetty.continuation.Continuation;
+import org.eclipse.jetty.continuation.ContinuationSupport;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.activemq.MessageAvailableListener;
+
+/*
+ * Listen for available messages and wakeup any continuations.
+ */
+public class AjaxListener implements MessageAvailableListener {
+    private static final Log LOG = LogFactory.getLog(AjaxListener.class);
+    
+    private long maximumReadTimeout;
+    private AjaxWebClient client;
+    private long lastAccess;
+    private Continuation continuation;
+
+    AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
+        this.client = client;
+        this.maximumReadTimeout = maximumReadTimeout;
+    }
+
+    public void access() {
+        lastAccess = System.currentTimeMillis();
+    }
+
+    public synchronized void setContinuation(Continuation continuation) {
+        this.continuation = continuation;
+    }
+
+    public synchronized void onMessageAvailable(MessageConsumer consumer) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("message for " + consumer + "continuation=" + 
continuation);
+        }
+        if (continuation != null) {
+            try {
+                Message message = consumer.receive(10);
+                continuation.setAttribute("message", message);
+                continuation.setAttribute("consumer", consumer);
+            } catch (Exception e) {
+                LOG.error("Error receiving message " + e, e);
+            }
+            continuation.resume();
+        } else if (System.currentTimeMillis() - lastAccess > 2 * 
this.maximumReadTimeout) {
+            new Thread() {
+                public void run() {
+                    client.closeConsumers();
+                };
+            }.start();
+        }
+    }
+}

Added: 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java?rev=1022071&view=auto
==============================================================================
--- 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java
 (added)
+++ 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxWebClient.java
 Wed Oct 13 11:41:16 2010
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.web;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Date;
+
+import javax.jms.MessageConsumer;
+import javax.servlet.http.HttpServletRequest;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.activemq.MessageAvailableConsumer;
+
+/*
+ * Collection of all data needed to fulfill requests from a single web client.
+ */
+public class AjaxWebClient extends WebClient {
+    private static final Log LOG = LogFactory.getLog(AjaxWebClient.class);
+    
+    // an instance which has not been accessed in this many milliseconds can 
be removed.
+    final long expireAfter = 60 * 1000;
+    
+    Map<MessageAvailableConsumer, String> idMap;
+    Map<MessageAvailableConsumer, String> destinationNameMap;
+    AjaxListener listener;
+    Long lastAccessed;
+    
+    public AjaxWebClient( HttpServletRequest request, long maximumReadTimeout 
) {
+        // 'id' meaning the first argument to the JavaScript addListener() 
function.
+        // used to indicate which JS callback should handle a given message.
+        this.idMap = new HashMap<MessageAvailableConsumer, String>();
+        
+        // map consumers to destinations like topic://test, etc.
+        this.destinationNameMap = new HashMap<MessageAvailableConsumer, 
String>();
+        
+        this.listener = new AjaxListener( this, maximumReadTimeout );
+        
+        this.lastAccessed = this.getNow();
+    }
+    
+    public Map<MessageAvailableConsumer, String> getIdMap() {
+        return this.idMap;
+    }
+    
+    public Map<MessageAvailableConsumer, String> getDestinationNameMap() {
+        return this.destinationNameMap;
+    }
+    
+    public AjaxListener getListener() {
+        return this.listener;
+    }
+    
+    public long getMillisSinceLastAccessed() {
+        return this.getNow() - this.lastAccessed;
+    }
+    
+    public void updateLastAccessed() {
+        this.lastAccessed = this.getNow();
+    }
+    
+    public boolean closeIfExpired() {
+        long now = (new Date()).getTime();
+        boolean returnVal = false;
+        if( this.getMillisSinceLastAccessed() > this.expireAfter ) {
+            this.close();
+            returnVal = true;
+        }
+        return returnVal;
+    }
+    
+    protected long getNow() {
+        return (new Date()).getTime();
+    }
+}

Modified: 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1022071&r1=1022070&r2=1022071&view=diff
==============================================================================
--- 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
 (original)
+++ 
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
 Wed Oct 13 11:41:16 2010
@@ -23,6 +23,10 @@ import java.io.StringWriter;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -66,12 +70,14 @@ import org.eclipse.jetty.continuation.Co
  */
 public class MessageListenerServlet extends MessageServletSupport {
     private static final Log LOG = 
LogFactory.getLog(MessageListenerServlet.class);
-
+    
     private String readTimeoutParameter = "timeout";
     private long defaultReadTimeout = -1;
     private long maximumReadTimeout = 25000;
     private int maximumMessages = 100;
-
+    private Timer clientCleanupTimer = new Timer();
+    private HashMap<String,AjaxWebClient> ajaxWebClients = new 
HashMap<String,AjaxWebClient>();
+    
     public void init() throws ServletException {
         ServletConfig servletConfig = getServletConfig();
         String name = servletConfig.getInitParameter("defaultReadTimeout");
@@ -86,8 +92,9 @@ public class MessageListenerServlet exte
         if (name != null) {
             maximumMessages = (int)asLong(name);
         }
+        clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
     }
-
+    
     /**
      * Sends a message to a destination or manage subscriptions. If the the
      * content type of the POST is
@@ -110,14 +117,13 @@ public class MessageListenerServlet exte
     protected void doPost(HttpServletRequest request, HttpServletResponse 
response) throws ServletException, IOException {
 
         // lets turn the HTTP post into a JMS Message
-
-        WebClient client = WebClient.getWebClient(request);
+        AjaxWebClient client = getAjaxWebClient( request );
         String messageIds = "";
 
         synchronized (client) {
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("POST client=" + client + " session=" + 
request.getSession().getId() + " info=" + request.getPathInfo() + " 
contentType=" + request.getContentType());
+                LOG.debug("POST client=" + client + " session=" + 
request.getSession().getId() + " clientId="+ request.getParameter("clientId") + 
" info=" + request.getPathInfo() + " contentType=" + request.getContentType());
                 // dump(request.getParameterMap());
             }
 
@@ -151,27 +157,27 @@ public class MessageListenerServlet exte
                     messages++;
 
                     if ("listen".equals(type)) {
-                        Listener listener = getListener(request);
-                        Map<MessageAvailableConsumer, String> consumerIdMap = 
getConsumerIdMap(request);
-                        Map<MessageAvailableConsumer, String> 
consumerDestinationMap = getConsumerDestinationNameMap(request);
+                        AjaxListener listener = client.getListener();
+                        Map<MessageAvailableConsumer, String> consumerIdMap = 
client.getIdMap();
+                        Map<MessageAvailableConsumer, String> 
consumerDestinationNameMap = client.getDestinationNameMap();
                         client.closeConsumer(destination); // drop any existing
                         // consumer.
                         MessageAvailableConsumer consumer = 
(MessageAvailableConsumer)client.getConsumer(destination, 
request.getHeader(WebClient.selectorName));
 
                         consumer.setAvailableListener(listener);
                         consumerIdMap.put(consumer, message);
-                        consumerDestinationMap.put(consumer, destinationName);
+                        consumerDestinationNameMap.put(consumer, 
destinationName);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Subscribed: " + consumer + " to " + 
destination + " id=" + message);
                         }
                     } else if ("unlisten".equals(type)) {
-                        Map<MessageAvailableConsumer, String> consumerIdMap = 
getConsumerIdMap(request);
-                        Map consumerDestinationMap = 
getConsumerDestinationNameMap(request);
+                        Map<MessageAvailableConsumer, String> consumerIdMap = 
client.getIdMap();
+                        Map consumerDestinationNameMap = 
client.getDestinationNameMap();
                         MessageAvailableConsumer consumer = 
(MessageAvailableConsumer)client.getConsumer(destination, 
request.getHeader(WebClient.selectorName));
 
                         consumer.setAvailableListener(null);
                         consumerIdMap.remove(consumer);
-                        consumerDestinationMap.remove(consumer);
+                        consumerDestinationNameMap.remove(consumer);
                         client.closeConsumer(destination);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Unsubscribed: " + consumer);
@@ -233,9 +239,9 @@ public class MessageListenerServlet exte
      */
     protected void doGet(HttpServletRequest request, HttpServletResponse 
response) throws ServletException, IOException {
         try {
-            WebClient client = WebClient.getWebClient(request);
+            AjaxWebClient client = getAjaxWebClient(request);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("GET client=" + client + " session=" + 
request.getSession().getId() + " uri=" + request.getRequestURI() + " query=" + 
request.getQueryString());
+                LOG.debug("GET client=" + client + " session=" + 
request.getSession().getId() + " clientId="+ request.getParameter("clientId") + 
" uri=" + request.getRequestURI() + " query=" + request.getQueryString());
             }
 
             doMessages(client, request, response);
@@ -253,7 +259,7 @@ public class MessageListenerServlet exte
      * @throws ServletException
      * @throws IOException
      */
-    protected void doMessages(WebClient client, HttpServletRequest request, 
HttpServletResponse response) throws JMSException, IOException {
+    protected void doMessages(AjaxWebClient client, HttpServletRequest 
request, HttpServletResponse response) throws JMSException, IOException {
 
         int messages = 0;
         // This is a poll for any messages
@@ -286,7 +292,11 @@ public class MessageListenerServlet exte
                     }
                 }
             }
-
+            
+            // prepare the response
+            response.setContentType("text/xml");
+            response.setHeader("Cache-Control", "no-cache");
+            
             if (message == null) {
                 Continuation continuation = 
ContinuationSupport.getContinuation(request);
                 
@@ -308,23 +318,19 @@ public class MessageListenerServlet exte
                 continuation.suspend();
                 
                 // Fetch the listeners
-                Listener listener = getListener(request);
+                AjaxListener listener = client.getListener();
 
                 // register this continuation with our listener.
                 listener.setContinuation(continuation);
                 
                 return;
             }
-            
-            // prepare the responds
-            response.setContentType("text/xml");
-            response.setHeader("Cache-Control", "no-cache");
 
             StringWriter swriter = new StringWriter();
             PrintWriter writer = new PrintWriter(swriter);
-
-            Map<MessageAvailableConsumer, String> consumerIdMap = 
getConsumerIdMap(request);
-            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = 
getConsumerDestinationNameMap(request);
+            
+            Map<MessageAvailableConsumer, String> consumerIdMap = 
client.getIdMap();
+            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = 
client.getDestinationNameMap();
             response.setStatus(HttpServletResponse.SC_OK);
             writer.println("<ajax-response>");
 
@@ -388,40 +394,35 @@ public class MessageListenerServlet exte
         }
         writer.println("</response>");
     }
-
-    protected Listener getListener(HttpServletRequest request) {
-        HttpSession session = request.getSession();
-        Listener listener = (Listener)session.getAttribute("mls.listener");
-        if (listener == null) {
-            listener = new Listener(WebClient.getWebClient(request));
-            session.setAttribute("mls.listener", listener);
-        }
-        return listener;
-    }
-
-    protected Map<MessageAvailableConsumer, String> 
getConsumerIdMap(HttpServletRequest request) {
+    
+    /*
+     * Return the AjaxWebClient for this session+clientId.
+     * Create one if it does not already exist.
+     */
+    protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
+        long now = (new Date()).getTime();
         HttpSession session = request.getSession(true);
-        Map<MessageAvailableConsumer, String> map = 
(Map<MessageAvailableConsumer, 
String>)session.getAttribute("mls.consumerIdMap");
-        if (map == null) {
-            map = new HashMap<MessageAvailableConsumer, String>();
-            session.setAttribute("mls.consumerIdMap", map);
+        
+        String clientId = request.getParameter( "clientId" );      
+        // if user doesn't supply a 'clientId', we'll just use a default.
+        if( clientId == null ) {
+            clientId = "defaultAjaxWebClient";
         }
-        return map;
-    }
-
-    protected Map<MessageAvailableConsumer, String> 
getConsumerDestinationNameMap(HttpServletRequest request) {
-        HttpSession session = request.getSession(true);
-        Map<MessageAvailableConsumer, String> map = 
(Map<MessageAvailableConsumer, 
String>)session.getAttribute("mls.consumerDestinationNameMap");
-        if (map == null) {
-            map = new HashMap<MessageAvailableConsumer, String>();
-            session.setAttribute("mls.consumerDestinationNameMap", map);
+        String sessionKey = session.getId() + '-' + clientId;
+        
+        AjaxWebClient client = ajaxWebClients.get( sessionKey );
+        synchronized (ajaxWebClients) {
+            // create a new AjaxWebClient if one does not already exist for 
this sessionKey.
+            if( client == null ) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug( "creating new AjaxWebClient in "+sessionKey );
+                }
+                client = new AjaxWebClient( request, maximumReadTimeout );
+                ajaxWebClients.put( sessionKey, client );
+            }
+            client.updateLastAccessed();
         }
-        return map;
-    }
-
-    protected boolean isRicoAjax(HttpServletRequest request) {
-        String rico = request.getParameter("rico");
-        return rico != null && rico.equals("true");
+        return client;
     }
 
     /**
@@ -440,48 +441,34 @@ public class MessageListenerServlet exte
         }
         return answer;
     }
-
+    
     /*
-     * Listen for available messages and wakeup any continuations.
+     * an instance of this class runs every minute (started in init), to clean 
up old web clients & free resources.
      */
-    private class Listener implements MessageAvailableListener {
-        WebClient client;
-        long lastAccess;
-        Continuation continuation;
-
-        Listener(WebClient client) {
-            this.client = client;
-        }
-
-        public void access() {
-            lastAccess = System.currentTimeMillis();
-        }
-
-        public synchronized void setContinuation(Continuation continuation) {
-            this.continuation = continuation;
-        }
-
-        public synchronized void onMessageAvailable(MessageConsumer consumer) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("message for " + consumer + "continuation=" + 
continuation);
+    private class ClientCleaner extends TimerTask {
+        public void run() {
+            if( LOG.isDebugEnabled() ) {
+                LOG.debug( "Cleaning up expired web clients." );
             }
-            if (continuation != null) {
-                    try {
-                        Message message = consumer.receive(10);
-                        continuation.setAttribute("message", message);
-                        continuation.setAttribute("consumer", consumer);
-                    } catch (Exception e) {
-                        LOG.error("Error receiving message " + e, e);
+            
+            synchronized( ajaxWebClients ) {
+                Iterator it = ajaxWebClients.entrySet().iterator();
+                while ( it.hasNext() ) {
+                    Map.Entry<String,AjaxWebClient> e = 
(Map.Entry<String,AjaxWebClient>)it.next();
+                    String key = e.getKey();
+                    AjaxWebClient val = e.getValue();
+                    if ( LOG.isDebugEnabled() ) {
+                        LOG.debug( "AjaxWebClient " + key + " last accessed " 
+ val.getMillisSinceLastAccessed()/1000 + " seconds ago." );
+                    }
+                    // close an expired client and remove it from the 
ajaxWebClients hash.
+                    if( val.closeIfExpired() ) {
+                        if ( LOG.isDebugEnabled() ) {
+                            LOG.debug( "Removing expired AjaxWebClient " + key 
);
+                        }
+                        it.remove();
                     }
-                    continuation.resume();
-            } else if (System.currentTimeMillis() - lastAccess > 2 * 
maximumReadTimeout) {
-                new Thread() {
-                    public void run() {
-                        client.closeConsumers();
-                    };
-                }.start();
+                }
             }
         }
-
     }
 }


Reply via email to